Changeset 02d288d in flexoentity
- Timestamp:
- 10/23/25 13:27:08 (3 months ago)
- Branches:
- master
- Children:
- 4ceca57
- Parents:
- 6a7dec1
- Files:
-
- 7 edited
-
flexoentity/domain.py (modified) (2 diffs)
-
flexoentity/flexo_entity.py (modified) (8 diffs)
-
flexoentity/id_factory.py (modified) (11 diffs)
-
tests/conftest.py (modified) (1 diff)
-
tests/test_id_lifecycle.py (modified) (1 diff)
-
tests/test_id_stress.py (modified) (2 diffs)
-
tests/test_persistance_integrity.py (modified) (2 diffs)
Legend:
- Unmodified
- Added
- Removed
-
flexoentity/domain.py
r6a7dec1 r02d288d 14 14 15 15 def __post_init__(self): 16 self.etype = EntityType.DOMAIN17 16 super().__post_init__() 18 17 … … 20 19 def text_seed(self) -> str: 21 20 """Deterministic text seed for ID generation.""" 22 return f"{self. domain}|{self.fullname}|{self.classification}|{self.owner}"21 return f"{self.fullname}|{self.classification}|{self.owner}" 23 22 24 23 def to_dict(self): -
flexoentity/flexo_entity.py
r6a7dec1 r02d288d 89 89 90 90 91 @dataclass 91 @dataclass(kw_only=True) 92 92 class FlexoEntity(ABC): 93 93 domain: str 94 94 etype: EntityType 95 95 state: EntityState 96 96 flexo_id: Optional[FlexOID] = field(default=None) 97 signature: str = field(default_factory=str) 98 origin: Optional[str] = field(default=None) 99 97 100 OID_PATTERN = re.compile( 98 101 r"^(?P<domain>[A-Z0-9]+)-(?P<etype>[A-Z]+)" … … 101 104 102 105 def __str__(self) -> str: 103 return f"{self.domain }-{self.etype}{self.date}-{self.unique_hash}@{self.version:03d}{self.state}"106 return f"{self.domain_code()}-{self.etype}{self.date}-{self.unique_hash}@{self.version:03d}{self.state}" 104 107 105 108 @classmethod … … 121 124 raise NotImplementedError("Subclasses must define text_seed property") 122 125 126 def canonical_seed(self) -> str: 127 return canonical_seed(self.text_seed) 128 123 129 @classmethod 124 130 @abstractmethod … … 127 133 raise NotImplementedError("Subclasses must implement default()") 128 134 135 def domain_code(self) -> str: 136 """Return canonical domain code for serialization and ID generation.""" 137 return self.domain.domain if hasattr(self.domain, "domain") else self.domain 138 129 139 def __post_init__(self): 130 """Generate ID and content fingerprint.""" 131 self.flexo_id = FlexOID.generate(self.domain, 140 """ 141 Generate ID and content fingerprint. 142 143 All entities must carry a `.domain` attribute exposing a domain code string. 144 This may be a `Domain` instance or a temporary wrapper used by the `Domain` 145 class itself to avoid circular initialization. 146 """ 147 148 self.flexo_id = FlexOID.generate(self.domain_code(), 132 149 self.etype.short(), 133 150 self.state.short(), 134 151 self.text_seed, 135 152 1) 153 seed = canonical_seed(self.text_seed) 154 self.signature = hashlib.blake2s(seed.encode("utf-8"), digest_size=8).hexdigest().upper() 136 155 137 156 def __str__(self): 138 157 return ( 139 158 f"{self.etype.name}({self.flexo_id}, {self.state.name}, " 140 f"sig={self.flexo_id.signature}..., v{self.version})" 141 ) 142 159 f"sig={self.signature}..., v{self.version})" 160 ) 143 161 def to_dict(self): 144 162 return { 145 "domain": self.domain ,163 "domain": self.domain_code(), 146 164 "etype": self.etype.name, 147 165 "state": self.state.name, 148 166 "flexo_id": str(self.flexo_id), 167 "signature": self.signature, 168 "origin": self.origin, 149 169 } 150 170 151 171 @classmethod 152 172 def from_dict(cls, data): 173 from flexoentity.domain import Domain # avoid circular import 174 domain_obj = Domain( 175 domain=data["domain"], 176 etype=EntityType.DOMAIN, 177 state=EntityState.DRAFT, # default when reconstructing context 178 ) 153 179 obj = cls( 154 data["domain"], 155 EntityType[data["etype"]], 156 data["text_seed"], 157 EntityState[data["state"]], 158 ) 159 obj.flexo_id = FlexOID(data["flexo_id"], data.get("signature", "")) 180 domain=domain_obj, 181 etype=EntityType[data["etype"]], 182 state=EntityState[data["state"]], 183 ) 184 obj.flexo_id = FlexOID.from_string(data["flexo_id"]) 185 obj.signature = data.get("signature", "") 186 obj.origin = data.get("origin") 160 187 return obj 161 188 162 189 def to_json(self, *, indent: int | None = None) -> str: 163 190 """Serialize entity (and its FlexOID) into JSON.""" … … 184 211 EntityState.PUBLISHED, 185 212 ) 186 187 # ─────────────────────────────────────────────────────────────── 213 def _update_signature(self) -> str: 214 """Always recompute the entity's content signature.""" 215 seed = self.canonical_seed() 216 return hashlib.blake2s(seed.encode("utf-8"), digest_size=8).hexdigest().upper() 217 188 218 def _update_fingerprint(self) -> bool: 189 """Recalculate fingerprint and return True if content changed.""" 190 # extract version from current flexo_id 191 new_oid = FlexOID.generate(self.domain, self.etype.short(), self.state.short(), self.text_seed, self.flexo_id.version) 192 if new_oid.signature != self.flexo_id.signature: 193 self.flexo_id = new_oid 219 """Update FlexOID if the content signature changed.""" 220 new_sig = self._update_signature() 221 if new_sig != self.signature: 222 self.signature = new_sig 223 self.flexo_id = FlexOID.generate(self.domain_code(), 224 self.etype.short(), 225 self.state.short(), 226 self.text_seed, 227 self.flexo_id.version) 194 228 return True 195 229 return False 196 230 197 231 # ─────────────────────────────────────────────────────────────── 198 232 def _transition(self, target_state: EntityState): … … 234 268 if self.state == EntityState.DRAFT: 235 269 new_version = self.flexo_id.version + 1 236 new_fid = FlexOID.generate( 237 self.domain, 270 new_fid = FlexOID.generate(self.domain_code(), 238 271 self.etype.short(), 239 272 EntityState.APPROVED.short(), … … 262 295 """Start new lineage when obsolete.""" 263 296 self.flexo_id = FlexOID.clone_new_base( 264 self.domain ,297 self.domain_code(), 265 298 self.etype.short(), 266 299 self.state.short(), … … 276 309 @staticmethod 277 310 def verify_integrity(entity) -> bool: 278 # --- inhaltlicher (kryptographischer) Check --- 279 # Hash ohne State, Signatur mit State 280 hash_seed = canonical_seed(f"{entity.domain}:{entity.etype.short()}:{entity.text_seed}") 281 sig_seed = f"{hash_seed}:{entity.state.short()}" 282 283 expected_sig = hashlib.blake2s(sig_seed.encode("utf-8"), digest_size=8).hexdigest().upper() 284 return expected_sig == entity.flexo_id.signature 311 """Verify that an entity’s content signature matches its actual content.""" 312 expected_sig = hashlib.blake2s( 313 canonical_seed(entity.text_seed).encode("utf-8"), digest_size=8 314 ).hexdigest().upper() 315 return expected_sig == entity.signature 285 316 286 317 def allowed_transitions(self) -> list[str]: -
flexoentity/id_factory.py
r6a7dec1 r02d288d 13 13 from datetime import datetime, timezone 14 14 import hashlib 15 import secrets 15 16 import itertools 16 17 import json 17 18 18 19 logger = logging.getLogger(__name__) 20 19 21 # ────────────────────────────────────────────────────────────────────────────── 20 22 # Canonicalization helpers … … 50 52 @classmethod 51 53 def from_string(cls, id_str: str): 52 # reconstruct without a known signature 53 return cls(id_str, signature="") 54 return cls(id_str) 54 55 55 56 @classmethod … … 57 58 if not (1 <= version <= cls.MAX_VERSION): 58 59 raise ValueError(f"Version {version} out of bounds (1..{cls.MAX_VERSION}).") 59 return FlexOID(f"{oid.prefix}@{version:03d}{oid.state_code}" , oid.signature)60 61 def __init__(self, flexo_id: str , signature: str):60 return FlexOID(f"{oid.prefix}@{version:03d}{oid.state_code}") 61 62 def __init__(self, flexo_id: str): 62 63 self.flexo_id = flexo_id 63 self.signature = signature64 64 65 65 def __eq__(self, other): 66 66 if not isinstance(other, FlexOID): 67 67 return NotImplemented 68 return self.flexo_id == other.flexo_id and self.signature == other.signature68 return self.flexo_id == other.flexo_id 69 69 70 70 def __lt__(self, other): 71 if not isinstance(other, FlexOID): 72 return NotImplemented 73 if self.prefix != other.prefix: 74 raise ValueError("Cannot order FlexOIDs from different prefixes") 71 75 return self.version < other.version 72 76 73 77 def __hash__(self): 74 return hash( (self.flexo_id, self.signature))78 return hash(self.flexo_id) 75 79 76 80 @staticmethod 77 81 def _blake_hash(text: str) -> str: 78 82 """Return a 6-hex BLAKE2s digest.""" 79 return hashlib.blake2s(text.encode("utf-8"), digest_size= 3).hexdigest().upper() # 3 bytes → 6hex83 return hashlib.blake2s(text.encode("utf-8"), digest_size=6).hexdigest().upper() # 6 bytes → 12 hex 80 84 81 85 @staticmethod 82 def _ensure_unique(hash_part: str) -> str: 83 """Append disambiguator only if the hash was already seen this session.""" 84 if hash_part not in FlexOID._seen_hashes: 85 FlexOID._seen_hashes.add(hash_part) 86 return hash_part 87 # fallback only if truly same hash (rare) 88 for suffix in range(1, 100): 89 alt = f"{hash_part}-{suffix}" 90 if alt not in FlexOID._seen_hashes: 91 FlexOID._seen_hashes.add(alt) 92 return alt 93 raise RuntimeError("Too many collisions; adjust hash length or logic.") 94 95 86 def safe_generate(domain, etype, estate, text, version=1, repo=None): 87 """ 88 Generate a new FlexOID with deterministic hashing, handling rare collisions. 89 """ 90 91 # Normalize domain (Domain object or string) 92 domain_code = getattr(domain, "domain", domain) 93 94 # Generate the deterministic candidate OID 95 oid = FlexOID.generate(domain_code, etype, estate, text, version=version) 96 97 # Collision detection — only if a repository is available 98 if repo is not None: 99 existing = repo.get(str(oid)) if hasattr(repo, "get") else repo.get(oid) 100 else: 101 existing = None 102 103 if existing: 104 try: 105 same_seed = existing.text_seed == text or \ 106 existing.canonical_seed() == canonical_seed(text) 107 except Exception: 108 same_seed = False 109 110 if not same_seed: 111 # Collision detected — regenerate deterministically 112 print("Collision detected", len(repo)) 113 logger.warning(f"FlexOID collision detected for {oid}") 114 115 # (A) refresh date 116 date_part = datetime.now(timezone.utc).strftime("%y%m%d") 117 118 # (B) add minimal deterministic salt (2 hex chars) 119 salt = secrets.token_hex(1) 120 salted_text = f"{text}|salt:{salt}" 121 122 # (C) generate new OID with new date and salted seed 123 oid = FlexOID.generate( 124 domain_code, 125 etype, 126 estate, 127 salted_text, 128 version=version, 129 ) 130 131 # (D) record lineage if the caller has `origin` tracking 132 if hasattr(existing, "flexo_id"): 133 logger.info(f"New lineage created from {existing.flexo_id}") 134 135 return oid 136 96 137 @staticmethod 97 138 def generate(domain: str, etype: str, estate: str, text: str, 98 version: int = 1 , enforce_unique=True):139 version: int = 1): 99 140 """ 100 141 Generate a deterministic Flex-O ID. … … 102 143 - The hash (and therefore prefix) depends only on domain, etype, and text. 103 144 → Prefix stays stable across state changes. 104 - The signature still includes the state for audit integrity.105 145 """ 106 146 … … 117 157 flexo_id_str = f"{domain}-{etype}{date_part}-{base_hash}@{ver_part}" 118 158 119 # state-dependent signature → per-state integrity 120 sig_seed = f"{hash_seed}:{estate}" 121 signature = hashlib.blake2s(sig_seed.encode("utf-8"), digest_size=8).hexdigest().upper() 122 123 return FlexOID(flexo_id_str, signature) 159 return FlexOID(flexo_id_str) 124 160 125 161 # ────────────────────────────────────────────────────────────────────────── … … 213 249 "version": self.version, 214 250 "state": self.state_code, 215 "signature": self.signature,216 251 } 217 252 # ────────────────────────────────────────────────────────────────────────── … … 223 258 224 259 Increments the version counter of an existing FlexOID while preserving 225 its prefix and digital signature. Used when an entity transitions to a260 its prefix. Used when an entity transitions to a 226 261 new revision within the same lifecycle (e.g., minor updates or approvals). 227 262 … … 234 269 ------- 235 270 FlexOID 236 A new Flex-O ID with the same prefix and signature, but version +1.271 A new Flex-O ID with the same prefix, but version +1. 237 272 238 273 Raises … … 243 278 Notes 244 279 ----- 245 - The signature remains unchanged since the entity lineage is continuous.246 280 - Warnings are logged when the version approaches obsolescence. 247 281 """ … … 254 288 255 289 new_id = f"{oid.prefix}@{new_ver:03d}{oid.state_code}" 256 return cls(new_id , oid.signature)290 return cls(new_id) 257 291 258 292 # ────────────────────────────────────────────────────────────────────────── … … 282 316 283 317 def __repr__(self): 284 return f"<FlexOID {self.flexo_id} sig={self.signature[:8]}…>"285 318 return f"<FlexOID {self.flexo_id}>" 319 -
tests/conftest.py
r6a7dec1 r02d288d 5 5 from flexoentity import FlexoEntity, EntityType, EntityState, Domain 6 6 7 import pytest 8 import json 9 from flexoentity import EntityType, EntityState, Domain 10 from builder.questions import RadioQuestion, AnswerOption # adjust path if different 11 from builder.media_items import NullMediaItem # adjust import if needed 7 12 8 class DummyEntity(FlexoEntity):9 """Minimal concrete subclass for testing FlexoEntity logic."""10 13 11 def __init__(self, domain, etype, state, seed="DUMMY"): 12 self._seed = seed 13 super().__init__(domain, etype, state) 14 @pytest.fixture(scope="session") 15 def domain(): 16 """Provide a reusable domain for all entity tests.""" 17 return Domain( 18 domain="SIG", 19 etype=EntityType.DOMAIN, 20 state=EntityState.DRAFT, 21 fullname="Signal Corps", 22 description="Questions related to communications and signaling systems.", 23 classification="RESTRICTED", 24 owner="test-suite" 25 ) 14 26 15 @property16 def text_seed(self) -> str:17 return self._seed18 27 19 @classmethod 20 def from_dict(cls, data): 21 """Ensure enums and seed are reconstructed correctly.""" 22 domain = data["domain"] 23 etype = EntityType[data["etype"]] if isinstance(data["etype"], str) else data["etype"] 24 state = EntityState[data["state"]] if isinstance(data["state"], str) else data["state"] 25 seed = data.get("text_seed", "DUMMY-CONTENT") 26 return cls(domain=domain, etype=etype, state=state, seed=seed) 28 @pytest.fixture 29 def radio_question(domain): 30 """Return a simple RadioQuestion entity for testing FlexoEntity logic.""" 31 q = RadioQuestion( 32 domain=domain, 33 etype=EntityType.QUESTION, 34 state=EntityState.DRAFT, 35 text="Which frequency band is used for shortwave communication?", 36 options=[ 37 AnswerOption(id="opt1", text="HF (3–30 MHz)", points=1), 38 AnswerOption(id="opt2", text="VHF (30–300 MHz)", points=0), 39 AnswerOption(id="opt3", text="UHF (300–3000 MHz)", points=0), 40 ] 41 ) 42 return q 27 43 28 @classmethod 29 def from_json(cls, data_str: str): 30 return cls.from_dict(json.loads(data_str)) 31 44 32 45 @pytest.fixture 33 def entity(): 34 """Generic FlexoEntity-like instance in draft state.""" 35 return DummyEntity( 36 domain=Domain(domain="SIG", etype=EntityType.DOMAIN, state=EntityState.DRAFT, fullname="Signal Corps", classification="RESTRICTED"), 37 etype=EntityType.CATALOG, 38 state=EntityState.DRAFT, 39 ) 46 def serialized_question(radio_question): 47 """Provide the serialized JSON form for roundtrip tests.""" 48 return radio_question.to_json() 49 50 51 @pytest.fixture 52 def deserialized_question(serialized_question): 53 """Recreate a question from JSON for consistency tests.""" 54 return RadioQuestion.from_json(serialized_question) 55 40 56 41 57 @pytest.fixture 42 58 def null_media(): 43 """Provide a default NullMediaItem instance for tests."""59 """Provide a default NullMediaItem instance for media tests.""" 44 60 return NullMediaItem( 45 domain= "GEN",61 domain=domain, 46 62 etype=EntityType.MEDIA, 47 63 state=EntityState.DRAFT -
tests/test_id_lifecycle.py
r6a7dec1 r02d288d 1 1 import pytest 2 3 from flexoentity import FlexOID, FlexoEntity, EntityType, EntityState 4 5 def test_initial_state(entity): 6 assert entity.state == EntityState.DRAFT 7 assert entity.flexo_id.version == 1 8 assert len(entity.flexo_id.signature) == 16 # blake2s digest_size=8 → 16 hex 9 assert FlexoEntity.verify_integrity(entity) 2 from flexoentity import FlexOID, FlexoEntity, EntityType, EntityState 10 3 11 4 12 def test_approval_bumps_version(entity): 13 entity.approve() 14 assert entity.state == EntityState.APPROVED 15 assert entity.flexo_id.version == 2 5 # ────────────────────────────────────────────────────────────────────────────── 6 # Tests adapted to use real RadioQuestion fixture instead of DummyEntity 7 # ────────────────────────────────────────────────────────────────────────────── 8 9 def test_initial_state(radio_question): 10 q = radio_question 11 assert q.state == EntityState.DRAFT 12 assert q.flexo_id.version == 1 13 assert FlexoEntity.verify_integrity(q) 16 14 17 15 18 def test_signing_bumps_version(entity): 19 entity.approve() 20 v_before = entity.flexo_id 21 entity.sign() 22 assert entity.state == EntityState.APPROVED_AND_SIGNED 23 assert entity.flexo_id != v_before 16 def test_approval_bumps_version(radio_question): 17 q = radio_question 18 q.approve() 19 assert q.state == EntityState.APPROVED 20 assert q.flexo_id.version == 2 24 21 25 22 26 def test_ publish_bumps_version(entity):27 entity.approve()28 entity.sign()29 v_before = entity.flexo_id.version30 entity.publish()31 assert entity.state == EntityState.PUBLISHED32 assert entity.flexo_id.version == v_before + 123 def test_signing_bumps_version(radio_question): 24 q = radio_question 25 q.approve() 26 v_before = str(q.flexo_id) 27 q.sign() 28 assert q.state == EntityState.APPROVED_AND_SIGNED 29 assert str(q.flexo_id) != v_before 33 30 34 31 35 def test_modify_content_changes_fingerprint(entity): 36 old_signature = entity.flexo_id.signature 37 entity._seed = "Rephrased content" # simulate text change 38 entity._update_fingerprint() 39 assert entity.flexo_id.signature != old_signature 32 def test_publish_bumps_version(radio_question): 33 q = radio_question 34 q.approve() 35 q.sign() 36 v_before = q.flexo_id.version 37 q.publish() 38 assert q.state == EntityState.PUBLISHED 39 assert q.flexo_id.version == v_before + 1 40 40 41 41 42 def test_no_version_bump_on_draft_edits(entity): 43 entity._seed = "Draft edit only" 44 entity._update_fingerprint() 45 assert entity.flexo_id.version == 1 42 def test_modify_content_changes_fingerprint(radio_question): 43 q = radio_question 44 q.text = "Rephrased content" # simulate text change 45 changed = q._update_fingerprint() 46 assert changed 46 47 47 48 48 def test_version_bump_after_edit_and_sign(entity): 49 entity.approve() 50 v1 = entity.flexo_id 51 entity._seed = "Changed content" 52 entity.sign() 53 assert entity.flexo_id != v1 49 def test_no_version_bump_on_draft_edits(radio_question): 50 q = radio_question 51 q.text = "Minor draft edit" 52 q._update_fingerprint() 53 assert q.flexo_id.version == 1 54 54 55 55 56 def test_integrity_check_passes_and_fails(entity): 57 entity.approve() 58 assert FlexoEntity.verify_integrity(entity) 59 # simulate tampering 60 entity._seed = "Tampered text" 61 assert not FlexoEntity.verify_integrity(entity) 56 def test_version_bump_after_edit_and_sign(radio_question): 57 q = radio_question 58 q.approve() 59 v1 = str(q.flexo_id) 60 q.text = "Changed content" 61 q.sign() 62 assert str(q.flexo_id) != v1 62 63 63 64 64 def test_obsolete_state(entity): 65 entity.approve() 66 entity.sign() 67 entity.publish() 68 entity.obsolete() 69 assert entity.state == EntityState.OBSOLETE 65 def test_integrity_check_passes_and_fails(radio_question): 66 q = radio_question 67 q.approve() 68 assert FlexoEntity.verify_integrity(q) 69 70 # simulate tampering 71 q.text = "Tampered text" 72 assert not FlexoEntity.verify_integrity(q) 70 73 71 74 72 def test_clone_new_base_resets_lineage(entity): 73 entity.approve() 74 entity.sign() 75 entity.publish() 76 entity.obsolete() 77 old_id = entity.flexo_id 78 entity.clone_new_base() 79 assert entity.flexo_id != old_id 80 assert entity.state == EntityState.DRAFT 81 assert entity.flexo_id.version == 1 75 def test_obsolete_state(radio_question): 76 q = radio_question 77 q.approve() 78 q.sign() 79 q.publish() 80 q.obsolete() 81 assert q.state == EntityState.OBSOLETE 82 82 83 83 84 def test_mass_version_increments_until_obsolete(entity): 85 entity.approve() 84 def test_clone_new_base_resets_lineage(radio_question): 85 q = radio_question 86 q.approve() 87 q.sign() 88 q.publish() 89 q.obsolete() 90 old_id = str(q.flexo_id) 91 q.clone_new_base() 92 assert str(q.flexo_id) != old_id 93 assert q.state == EntityState.DRAFT 94 assert q.flexo_id.version == 1 95 96 97 def test_mass_version_increments_until_obsolete(radio_question): 98 q = radio_question 99 q.approve() 86 100 for _ in range(FlexOID.MAX_VERSION - 2): 87 entity.sign()101 q.sign() 88 102 with pytest.raises(RuntimeError, match="mark obsolete"): 89 entity.sign()103 q.sign() -
tests/test_id_stress.py
r6a7dec1 r02d288d 3 3 Focus: collision avoidance, version ceiling, reproducibility. 4 4 """ 5 5 6 import pytest 6 7 import random 8 import logging 9 from flexoentity import FlexOID, EntityType, EntityState 10 from builder.questions import RadioQuestion, AnswerOption 7 11 8 from flexoentity import FlexOID, EntityType, EntityState, Domain 12 logger = logging.getLogger(__name__) 9 13 10 from tests.conftest import DummyEntity 11 12 # ────────────────────────────────────────────────────────────────────────────── 13 def test_bulk_generation_uniqueness(): 14 """Generate 10,000 IDs and assert uniqueness (statistical test).""" 15 domain = Domain(domain="SIG", etype=EntityType.DOMAIN, state=EntityState.DRAFT, 16 fullname="Signal Corps", classification="RESTRICTED", owner="MESE") 17 14 def test_bulk_generation_uniqueness(domain): 15 """ 16 Generate 10,000 IDs and ensure uniqueness using safe_generate(). 17 If a collision occurs, safe_generate() must resolve it automatically 18 via salt + date adjustment. 19 """ 18 20 etype = EntityType.QUESTION 19 21 estate = EntityState.DRAFT 20 seeds = [f"question {i}" for i in range( 10_000)]22 seeds = [f"question {i}" for i in range(4000000)] 21 23 22 ids = [FlexOID.generate(domain, etype, estate, seed) for seed in seeds] 24 # Simulate a simple in-memory repository for collision detection 25 repo = {} 23 26 24 assert len(ids) == len(set(ids)), "ID collisions detected in bulk generation" 27 def repo_get(oid_str): 28 return repo.get(str(oid_str)) 25 29 30 # Generate IDs using safe_generate 31 ids = [] 32 for seed in seeds: 33 oid = FlexOID.safe_generate(domain.domain, etype, estate, seed, repo=repo) 34 assert isinstance(oid, FlexOID) 35 ids.append(str(oid)) 36 repo[str(oid)] = oid # register for future collision detection 26 37 27 def test_disambiguator_trigger(): 38 unique_count = len(set(ids)) 39 total_count = len(ids) 40 collisions = total_count - unique_count 41 42 logger.info(f"Generated {total_count} IDs ({collisions} collisions handled).") 43 44 # Assert that safe_generate avoided duplicates 45 assert total_count == unique_count, f"Unexpected duplicate IDs ({collisions} found)" 46 47 # Sanity check: IDs should look canonical 48 assert all(id_str.startswith("SIG-") for id_str in ids) 49 assert all("@" in id_str for id_str in ids) 50 51 def test_id_generation_is_deterministic(domain): 28 52 """ 29 53 Generating the same entity twice with same inputs yields identical ID. 30 54 (No runtime disambiguation; IDs are deterministic by design.) 31 55 """ 32 domain = "AF"33 56 etype = EntityType.QUESTION 34 57 estate = EntityState.DRAFT 35 58 text = "identical question text" 36 id1 = FlexOID.generate(domain, etype, estate, text) 37 id2 = FlexOID.generate(domain, etype, estate, text) 38 # IDs must be identical, because we now enforce determinism, not randomization 59 60 id1 = FlexOID.generate(domain.domain, etype, estate, text) 61 id2 = FlexOID.generate(domain.domain, etype, estate, text) 62 # IDs must be identical because generation is deterministic 39 63 assert id1 == id2 40 assert id1.signature == id2.signature41 64 42 65 43 def test_id_reproducibility_across_runs( ):66 def test_id_reproducibility_across_runs(domain): 44 67 """ 45 68 The same seed on a new process (fresh _seen_hashes) 46 69 should yield the same base ID (without suffix). 47 70 """ 48 domain = Domain(domain="SIG", etype=EntityType.DOMAIN, state=EntityState.DRAFT,49 fullname="Signal Corps", classification="RESTRICTED")50 71 etype = EntityType.CATALOG 51 72 estate = EntityState.DRAFT 52 73 seed = "reproducibility test seed" 53 id1 = FlexOID.generate(domain, etype, estate, seed) 54 # Reset hash cache74 75 id1 = FlexOID.generate(domain.domain, etype, estate, seed) 55 76 FlexOID._seen_hashes.clear() 56 id2 = FlexOID.generate(domain, etype, estate, seed) 77 id2 = FlexOID.generate(domain.domain, etype, estate, seed) 78 57 79 assert id1 == id2 58 assert id1.signature == id2.signature59 80 60 81 61 def test_version_ceiling_enforcement( ):82 def test_version_ceiling_enforcement(radio_question): 62 83 """Simulate approaching @999 to trigger obsolescence guard.""" 63 entity = DummyEntity(domain="AF", etype=EntityType.EXAM, state=EntityState.DRAFT, seed="Final Exam 2025") 64 entity.approve() 84 q = radio_question 85 q.approve() 86 65 87 # artificially bump version number to near ceiling 66 entity.flexo_id = FlexOID.from_oid_and_version(entity.flexo_id, 998)88 q.flexo_id = FlexOID.from_oid_and_version(q.flexo_id, 998) 67 89 68 90 # 998 → 999 is allowed 69 entity.sign()70 assert entity.flexo_id.version == 99991 q.sign() 92 assert q.flexo_id.version == 999 71 93 72 94 # 999 → 1000 should raise RuntimeError 73 95 with pytest.raises(RuntimeError): 74 entity.sign()96 q.sign() 75 97 76 98 77 def test_massive_lifecycle_simulation( ):99 def test_massive_lifecycle_simulation(domain): 78 100 """ 79 Generate 100 random entities, simulate multiple edits and state transitions,101 Generate 100 random RadioQuestions, simulate multiple edits and state transitions, 80 102 ensure all final IDs and fingerprints are unique and valid. 81 103 """ 82 entities = [DummyEntity(domain="AF", etype=EntityType.QUESTION, state=EntityState.DRAFT, seed=f"random question {i}") for i in range(100)] 104 entities = [ 105 RadioQuestion( 106 domain=domain, 107 etype=EntityType.QUESTION, 108 state=EntityState.DRAFT, 109 text=f"random question {i}", 110 options=[ 111 AnswerOption(id="opt4", text="HF (3–30 MHz)", points=1), 112 AnswerOption(id="opt5", text="VHF (30–300 MHz)", points=0), 113 ], 114 ) 115 for i in range(100) 116 ] 83 117 84 118 for e in entities: 85 # random edit , approval, signing86 e. _seed+= " updated"119 # random edit 120 e.text += " updated" 87 121 e._update_fingerprint() 122 123 # lifecycle transitions 88 124 e.approve() 89 125 if random.random() > 0.3: … … 92 128 e.publish() 93 129 94 ids = [e.flexo_id for e in entities] 95 fps = [e.flexo_id.signature for e in entities] 96 assert len(ids) == len(set(ids)), "Duplicate IDs after random lifecycle" 97 assert len(fps) == len(set(fps)), "Duplicate fingerprints after random lifecycle" 130 flexoids = [e.flexo_id for e in entities] 131 assert len(flexoids) == len(set(flexoids)), "Duplicate FlexOIDs after lifecycle simulation" -
tests/test_persistance_integrity.py
r6a7dec1 r02d288d 6 6 import pytest 7 7 8 from flexoentity import FlexOID, EntityType,EntityState9 from tests.conftest import DummyEntity8 from flexoentity import EntityState 9 from builder.questions import RadioQuestion, AnswerOption 10 10 11 11 12 12 # ────────────────────────────────────────────────────────────────────────────── 13 13 @pytest.fixture 14 def approved_ entity():15 """ A fully published dummy entityfor persistence tests."""16 e = DummyEntity(17 domain= "AF",18 etype= EntityType.QUESTION,14 def approved_question(domain): 15 """Provide a fully approved and published RadioQuestion for persistence tests.""" 16 q = RadioQuestion( 17 domain=domain, 18 etype=None, # RadioQuestion sets this internally to EntityType.QUESTION 19 19 state=EntityState.DRAFT, 20 seed="What is Ohm’s law?" 20 text="What is Ohm’s law?", 21 options=[ 22 AnswerOption(text="U = R × I", points=1), 23 AnswerOption(text="U = I / R", points=0), 24 AnswerOption(text="R = U × I", points=0), 25 ], 21 26 ) 22 e.approve()23 e.sign()24 e.publish()25 return e27 q.approve() 28 q.sign() 29 q.publish() 30 return q 26 31 27 @pytest.mark.skip(reason="FlexOIDs are regenerated on import; enable once JSON format is stable") 28 def test_json_roundtrip_preserves_integrity(approved_entity): 32 33 @pytest.mark.skip(reason="FlexOIDs regenerated on import; enable once JSON format is stable") 34 def test_json_roundtrip_preserves_integrity(approved_question): 29 35 """ 30 Export to JSON and reload — ensure fingerprints remain valid.36 Export to JSON and reload — ensure fingerprints and signatures remain valid. 31 37 """ 32 json_str = approved_ entity.to_json()33 loaded = approved_entity.__class__.from_json(json_str)38 json_str = approved_question.to_json() 39 loaded = RadioQuestion.from_json(json_str) 34 40 35 41 # Fingerprint and state should match — integrity must pass 36 assert approved_entity.__class__.verify_integrity(loaded)42 assert RadioQuestion.verify_integrity(loaded) 37 43 38 44 # Metadata should be preserved exactly 39 assert approved_entity.flexo_id.signature == loaded.flexo_id.signature 40 assert approved_entity.flexo_id == loaded.flexo_id 41 assert loaded.state == approved_entity.state 45 assert approved_question.signature == loaded.signature 46 assert approved_question.flexo_id == loaded.flexo_id 47 assert loaded.state == approved_question.state 48 42 49 43 50 # ────────────────────────────────────────────────────────────────────────────── 44 51 45 @pytest.mark.skip(reason="FlexOIDs regenerated on import; tampering detection not applicable yet")46 def test_json_tampering_detection(approved_ entity):52 @pytest.mark.skip(reason="FlexOIDs regenerated on import; tampering detection not yet implemented") 53 def test_json_tampering_detection(approved_question): 47 54 """Tampering with content should invalidate fingerprint verification.""" 48 json_str = approved_ entity.to_json()49 tampered _data= json.loads(json_str)50 tampered _data["text_seed"] = "Tampered content injection"51 tampered_json = json.dumps(tampered _data)55 json_str = approved_question.to_json() 56 tampered = json.loads(json_str) 57 tampered["text"] = "Tampered content injection" 58 tampered_json = json.dumps(tampered) 52 59 53 # We use DummyEntity.from_json to reconstruct (FlexoEntity is abstract) 54 loaded = approved_entity.__class__.from_json(tampered_json) 55 assert not approved_entity.__class__.verify_integrity(loaded) 60 loaded = RadioQuestion.from_json(tampered_json) 61 assert not RadioQuestion.verify_integrity(loaded) 56 62 57 63 … … 59 65 60 66 @pytest.mark.skip(reason="FlexOIDs regenerated on import; corruption detection not yet applicable") 61 def test_json_file_corruption(approved_ entity, tmp_path):67 def test_json_file_corruption(approved_question, tmp_path): 62 68 """Simulate file corruption — integrity check must fail.""" 63 file = tmp_path / " entity.json"64 json_str = approved_ entity.to_json()69 file = tmp_path / "question.json" 70 json_str = approved_question.to_json() 65 71 file.write_text(json_str) 66 72 67 # Corrupt the file 73 # Corrupt the file (simulate accidental byte modification) 68 74 corrupted = json_str.replace("Ohm’s", "Omm’s") 69 75 file.write_text(corrupted) 70 76 71 loaded = approved_entity.__class__.from_json(file.read_text())72 assert not approved_entity.__class__.verify_integrity(loaded)77 loaded = RadioQuestion.from_json(file.read_text()) 78 assert not RadioQuestion.verify_integrity(loaded)
Note:
See TracChangeset
for help on using the changeset viewer.
