Changeset 02d288d in flexoentity for flexoentity/flexo_entity.py
- Timestamp:
- 10/23/25 13:27:08 (3 months ago)
- Branches:
- master
- Children:
- 4ceca57
- Parents:
- 6a7dec1
- File:
-
- 1 edited
-
flexoentity/flexo_entity.py (modified) (8 diffs)
Legend:
- Unmodified
- Added
- Removed
-
flexoentity/flexo_entity.py
r6a7dec1 r02d288d 89 89 90 90 91 @dataclass 91 @dataclass(kw_only=True) 92 92 class FlexoEntity(ABC): 93 93 domain: str 94 94 etype: EntityType 95 95 state: EntityState 96 96 flexo_id: Optional[FlexOID] = field(default=None) 97 signature: str = field(default_factory=str) 98 origin: Optional[str] = field(default=None) 99 97 100 OID_PATTERN = re.compile( 98 101 r"^(?P<domain>[A-Z0-9]+)-(?P<etype>[A-Z]+)" … … 101 104 102 105 def __str__(self) -> str: 103 return f"{self.domain }-{self.etype}{self.date}-{self.unique_hash}@{self.version:03d}{self.state}"106 return f"{self.domain_code()}-{self.etype}{self.date}-{self.unique_hash}@{self.version:03d}{self.state}" 104 107 105 108 @classmethod … … 121 124 raise NotImplementedError("Subclasses must define text_seed property") 122 125 126 def canonical_seed(self) -> str: 127 return canonical_seed(self.text_seed) 128 123 129 @classmethod 124 130 @abstractmethod … … 127 133 raise NotImplementedError("Subclasses must implement default()") 128 134 135 def domain_code(self) -> str: 136 """Return canonical domain code for serialization and ID generation.""" 137 return self.domain.domain if hasattr(self.domain, "domain") else self.domain 138 129 139 def __post_init__(self): 130 """Generate ID and content fingerprint.""" 131 self.flexo_id = FlexOID.generate(self.domain, 140 """ 141 Generate ID and content fingerprint. 142 143 All entities must carry a `.domain` attribute exposing a domain code string. 144 This may be a `Domain` instance or a temporary wrapper used by the `Domain` 145 class itself to avoid circular initialization. 146 """ 147 148 self.flexo_id = FlexOID.generate(self.domain_code(), 132 149 self.etype.short(), 133 150 self.state.short(), 134 151 self.text_seed, 135 152 1) 153 seed = canonical_seed(self.text_seed) 154 self.signature = hashlib.blake2s(seed.encode("utf-8"), digest_size=8).hexdigest().upper() 136 155 137 156 def __str__(self): 138 157 return ( 139 158 f"{self.etype.name}({self.flexo_id}, {self.state.name}, " 140 f"sig={self.flexo_id.signature}..., v{self.version})" 141 ) 142 159 f"sig={self.signature}..., v{self.version})" 160 ) 143 161 def to_dict(self): 144 162 return { 145 "domain": self.domain ,163 "domain": self.domain_code(), 146 164 "etype": self.etype.name, 147 165 "state": self.state.name, 148 166 "flexo_id": str(self.flexo_id), 167 "signature": self.signature, 168 "origin": self.origin, 149 169 } 150 170 151 171 @classmethod 152 172 def from_dict(cls, data): 173 from flexoentity.domain import Domain # avoid circular import 174 domain_obj = Domain( 175 domain=data["domain"], 176 etype=EntityType.DOMAIN, 177 state=EntityState.DRAFT, # default when reconstructing context 178 ) 153 179 obj = cls( 154 data["domain"], 155 EntityType[data["etype"]], 156 data["text_seed"], 157 EntityState[data["state"]], 158 ) 159 obj.flexo_id = FlexOID(data["flexo_id"], data.get("signature", "")) 180 domain=domain_obj, 181 etype=EntityType[data["etype"]], 182 state=EntityState[data["state"]], 183 ) 184 obj.flexo_id = FlexOID.from_string(data["flexo_id"]) 185 obj.signature = data.get("signature", "") 186 obj.origin = data.get("origin") 160 187 return obj 161 188 162 189 def to_json(self, *, indent: int | None = None) -> str: 163 190 """Serialize entity (and its FlexOID) into JSON.""" … … 184 211 EntityState.PUBLISHED, 185 212 ) 186 187 # ─────────────────────────────────────────────────────────────── 213 def _update_signature(self) -> str: 214 """Always recompute the entity's content signature.""" 215 seed = self.canonical_seed() 216 return hashlib.blake2s(seed.encode("utf-8"), digest_size=8).hexdigest().upper() 217 188 218 def _update_fingerprint(self) -> bool: 189 """Recalculate fingerprint and return True if content changed.""" 190 # extract version from current flexo_id 191 new_oid = FlexOID.generate(self.domain, self.etype.short(), self.state.short(), self.text_seed, self.flexo_id.version) 192 if new_oid.signature != self.flexo_id.signature: 193 self.flexo_id = new_oid 219 """Update FlexOID if the content signature changed.""" 220 new_sig = self._update_signature() 221 if new_sig != self.signature: 222 self.signature = new_sig 223 self.flexo_id = FlexOID.generate(self.domain_code(), 224 self.etype.short(), 225 self.state.short(), 226 self.text_seed, 227 self.flexo_id.version) 194 228 return True 195 229 return False 196 230 197 231 # ─────────────────────────────────────────────────────────────── 198 232 def _transition(self, target_state: EntityState): … … 234 268 if self.state == EntityState.DRAFT: 235 269 new_version = self.flexo_id.version + 1 236 new_fid = FlexOID.generate( 237 self.domain, 270 new_fid = FlexOID.generate(self.domain_code(), 238 271 self.etype.short(), 239 272 EntityState.APPROVED.short(), … … 262 295 """Start new lineage when obsolete.""" 263 296 self.flexo_id = FlexOID.clone_new_base( 264 self.domain ,297 self.domain_code(), 265 298 self.etype.short(), 266 299 self.state.short(), … … 276 309 @staticmethod 277 310 def verify_integrity(entity) -> bool: 278 # --- inhaltlicher (kryptographischer) Check --- 279 # Hash ohne State, Signatur mit State 280 hash_seed = canonical_seed(f"{entity.domain}:{entity.etype.short()}:{entity.text_seed}") 281 sig_seed = f"{hash_seed}:{entity.state.short()}" 282 283 expected_sig = hashlib.blake2s(sig_seed.encode("utf-8"), digest_size=8).hexdigest().upper() 284 return expected_sig == entity.flexo_id.signature 311 """Verify that an entity’s content signature matches its actual content.""" 312 expected_sig = hashlib.blake2s( 313 canonical_seed(entity.text_seed).encode("utf-8"), digest_size=8 314 ).hexdigest().upper() 315 return expected_sig == entity.signature 285 316 286 317 def allowed_transitions(self) -> list[str]:
Note:
See TracChangeset
for help on using the changeset viewer.
