Changeset 02d288d in flexoentity


Ignore:
Timestamp:
10/23/25 13:27:08 (3 months ago)
Author:
Enrico Schwass <ennoausberlin@…>
Branches:
master
Children:
4ceca57
Parents:
6a7dec1
Message:

improve hash generation and collision handler - move signature from FlexOID to FlexoEntity

Files:
7 edited

Legend:

Unmodified
Added
Removed
  • flexoentity/domain.py

    r6a7dec1 r02d288d  
    1414
    1515    def __post_init__(self):
    16         self.etype = EntityType.DOMAIN
    1716        super().__post_init__()
    1817
     
    2019    def text_seed(self) -> str:
    2120        """Deterministic text seed for ID generation."""
    22         return f"{self.domain}|{self.fullname}|{self.classification}|{self.owner}"
     21        return f"{self.fullname}|{self.classification}|{self.owner}"
    2322
    2423    def to_dict(self):
  • flexoentity/flexo_entity.py

    r6a7dec1 r02d288d  
    8989
    9090
    91 @dataclass
     91@dataclass(kw_only=True)
    9292class FlexoEntity(ABC):
    9393    domain: str
    9494    etype: EntityType
    9595    state: EntityState
    96 
     96    flexo_id: Optional[FlexOID] = field(default=None)
     97    signature: str = field(default_factory=str)
     98    origin: Optional[str] = field(default=None)
     99   
    97100    OID_PATTERN = re.compile(
    98101        r"^(?P<domain>[A-Z0-9]+)-(?P<etype>[A-Z]+)"
     
    101104
    102105    def __str__(self) -> str:
    103         return f"{self.domain}-{self.etype}{self.date}-{self.unique_hash}@{self.version:03d}{self.state}"
     106        return f"{self.domain_code()}-{self.etype}{self.date}-{self.unique_hash}@{self.version:03d}{self.state}"
    104107
    105108    @classmethod
     
    121124        raise NotImplementedError("Subclasses must define text_seed property")
    122125
     126    def canonical_seed(self) -> str:
     127        return canonical_seed(self.text_seed)
     128
    123129    @classmethod
    124130    @abstractmethod
     
    127133        raise NotImplementedError("Subclasses must implement default()")
    128134
     135    def domain_code(self) -> str:
     136        """Return canonical domain code for serialization and ID generation."""
     137        return self.domain.domain if hasattr(self.domain, "domain") else self.domain
     138
    129139    def __post_init__(self):
    130         """Generate ID and content fingerprint."""
    131         self.flexo_id = FlexOID.generate(self.domain,
     140        """
     141        Generate ID and content fingerprint.
     142       
     143        All entities must carry a `.domain` attribute exposing a domain code string.
     144        This may be a `Domain` instance or a temporary wrapper used by the `Domain`
     145        class itself to avoid circular initialization.
     146        """
     147       
     148        self.flexo_id = FlexOID.generate(self.domain_code(),
    132149                                         self.etype.short(),
    133150                                         self.state.short(),
    134151                                         self.text_seed,
    135152                                         1)
     153        seed = canonical_seed(self.text_seed)
     154        self.signature = hashlib.blake2s(seed.encode("utf-8"), digest_size=8).hexdigest().upper()
    136155
    137156    def __str__(self):
    138157        return (
    139158            f"{self.etype.name}({self.flexo_id}, {self.state.name}, "
    140             f"sig={self.flexo_id.signature}..., v{self.version})"
    141         )
    142 
     159            f"sig={self.signature}..., v{self.version})"
     160        )
    143161    def to_dict(self):
    144162        return {
    145             "domain": self.domain,
     163            "domain": self.domain_code(),
    146164            "etype": self.etype.name,
    147165            "state": self.state.name,
    148166            "flexo_id": str(self.flexo_id),
     167            "signature": self.signature,
     168            "origin": self.origin,
    149169        }
    150 
     170   
    151171    @classmethod
    152172    def from_dict(cls, data):
     173        from flexoentity.domain import Domain  # avoid circular import
     174        domain_obj = Domain(
     175            domain=data["domain"],
     176            etype=EntityType.DOMAIN,
     177            state=EntityState.DRAFT,  # default when reconstructing context
     178        )
    153179        obj = cls(
    154             data["domain"],
    155             EntityType[data["etype"]],
    156             data["text_seed"],
    157             EntityState[data["state"]],
    158         )
    159         obj.flexo_id = FlexOID(data["flexo_id"], data.get("signature", ""))
     180            domain=domain_obj,
     181            etype=EntityType[data["etype"]],
     182            state=EntityState[data["state"]],
     183        )
     184        obj.flexo_id = FlexOID.from_string(data["flexo_id"])
     185        obj.signature = data.get("signature", "")
     186        obj.origin = data.get("origin")
    160187        return obj
    161 
     188     
    162189    def to_json(self, *, indent: int | None = None) -> str:
    163190        """Serialize entity (and its FlexOID) into JSON."""
     
    184211            EntityState.PUBLISHED,
    185212        )
    186    
    187     # ───────────────────────────────────────────────────────────────
     213    def _update_signature(self) -> str:
     214        """Always recompute the entity's content signature."""
     215        seed = self.canonical_seed()
     216        return hashlib.blake2s(seed.encode("utf-8"), digest_size=8).hexdigest().upper()
     217
    188218    def _update_fingerprint(self) -> bool:
    189         """Recalculate fingerprint and return True if content changed."""
    190         # extract version from current flexo_id
    191         new_oid = FlexOID.generate(self.domain, self.etype.short(), self.state.short(), self.text_seed, self.flexo_id.version)
    192         if new_oid.signature != self.flexo_id.signature:
    193             self.flexo_id = new_oid
     219        """Update FlexOID if the content signature changed."""
     220        new_sig = self._update_signature()
     221        if new_sig != self.signature:
     222            self.signature = new_sig
     223            self.flexo_id = FlexOID.generate(self.domain_code(),
     224                                             self.etype.short(),
     225                                             self.state.short(),
     226                                             self.text_seed,
     227                                             self.flexo_id.version)
    194228            return True
    195229        return False
    196 
     230   
    197231    # ───────────────────────────────────────────────────────────────
    198232    def _transition(self, target_state: EntityState):
     
    234268        if self.state == EntityState.DRAFT:
    235269            new_version = self.flexo_id.version + 1
    236             new_fid = FlexOID.generate(
    237                 self.domain,
     270            new_fid = FlexOID.generate(self.domain_code(),
    238271                self.etype.short(),
    239272                EntityState.APPROVED.short(),
     
    262295        """Start new lineage when obsolete."""
    263296        self.flexo_id = FlexOID.clone_new_base(
    264             self.domain,
     297            self.domain_code(),
    265298            self.etype.short(),
    266299            self.state.short(),
     
    276309    @staticmethod
    277310    def verify_integrity(entity) -> bool:
    278         # --- inhaltlicher (kryptographischer) Check ---
    279         # Hash ohne State, Signatur mit State
    280         hash_seed = canonical_seed(f"{entity.domain}:{entity.etype.short()}:{entity.text_seed}")
    281         sig_seed  = f"{hash_seed}:{entity.state.short()}"
    282 
    283         expected_sig = hashlib.blake2s(sig_seed.encode("utf-8"), digest_size=8).hexdigest().upper()
    284         return expected_sig == entity.flexo_id.signature
     311        """Verify that an entity’s content signature matches its actual content."""
     312        expected_sig = hashlib.blake2s(
     313            canonical_seed(entity.text_seed).encode("utf-8"), digest_size=8
     314        ).hexdigest().upper()
     315        return expected_sig == entity.signature
    285316
    286317    def allowed_transitions(self) -> list[str]:
  • flexoentity/id_factory.py

    r6a7dec1 r02d288d  
    1313from datetime import datetime, timezone
    1414import hashlib
     15import secrets
    1516import itertools
    1617import json
    1718
    1819logger = logging.getLogger(__name__)
     20
    1921# ──────────────────────────────────────────────────────────────────────────────
    2022#  Canonicalization helpers
     
    5052    @classmethod
    5153    def from_string(cls, id_str: str):
    52         # reconstruct without a known signature
    53         return cls(id_str, signature="")
     54        return cls(id_str)
    5455
    5556    @classmethod
     
    5758        if not (1 <= version <= cls.MAX_VERSION):
    5859            raise ValueError(f"Version {version} out of bounds (1..{cls.MAX_VERSION}).")
    59         return FlexOID(f"{oid.prefix}@{version:03d}{oid.state_code}", oid.signature)
    60 
    61     def __init__(self, flexo_id: str, signature: str):
     60        return FlexOID(f"{oid.prefix}@{version:03d}{oid.state_code}")
     61
     62    def __init__(self, flexo_id: str):
    6263        self.flexo_id = flexo_id
    63         self.signature = signature
    6464
    6565    def __eq__(self, other):
    6666        if not isinstance(other, FlexOID):
    6767            return NotImplemented
    68         return self.flexo_id == other.flexo_id and self.signature == other.signature
     68        return self.flexo_id == other.flexo_id
    6969
    7070    def __lt__(self, other):
     71        if not isinstance(other, FlexOID):
     72            return NotImplemented
     73        if self.prefix != other.prefix:
     74            raise ValueError("Cannot order FlexOIDs from different prefixes")
    7175        return self.version < other.version
    72 
     76       
    7377    def __hash__(self):
    74         return hash((self.flexo_id, self.signature))
     78        return hash(self.flexo_id)
    7579
    7680    @staticmethod
    7781    def _blake_hash(text: str) -> str:
    7882        """Return a 6-hex BLAKE2s digest."""
    79         return hashlib.blake2s(text.encode("utf-8"), digest_size=3).hexdigest().upper()  # 3 bytes → 6 hex
     83        return hashlib.blake2s(text.encode("utf-8"), digest_size=6).hexdigest().upper()  # 6 bytes → 12 hex
    8084
    8185    @staticmethod
    82     def _ensure_unique(hash_part: str) -> str:
    83         """Append disambiguator only if the hash was already seen this session."""
    84         if hash_part not in FlexOID._seen_hashes:
    85             FlexOID._seen_hashes.add(hash_part)
    86             return hash_part
    87         # fallback only if truly same hash (rare)
    88         for suffix in range(1, 100):
    89             alt = f"{hash_part}-{suffix}"
    90             if alt not in FlexOID._seen_hashes:
    91                 FlexOID._seen_hashes.add(alt)
    92                 return alt
    93         raise RuntimeError("Too many collisions; adjust hash length or logic.")
    94 
    95 
     86    def safe_generate(domain, etype, estate, text, version=1, repo=None):
     87        """
     88        Generate a new FlexOID with deterministic hashing, handling rare collisions.
     89        """
     90
     91        # Normalize domain (Domain object or string)
     92        domain_code = getattr(domain, "domain", domain)
     93
     94        # Generate the deterministic candidate OID
     95        oid = FlexOID.generate(domain_code, etype, estate, text, version=version)
     96
     97        # Collision detection — only if a repository is available
     98        if repo is not None:
     99            existing = repo.get(str(oid)) if hasattr(repo, "get") else repo.get(oid)
     100        else:
     101            existing = None
     102
     103        if existing:
     104            try:
     105                same_seed = existing.text_seed == text or \
     106                            existing.canonical_seed() == canonical_seed(text)
     107            except Exception:
     108                same_seed = False
     109
     110            if not same_seed:
     111                # Collision detected — regenerate deterministically
     112                print("Collision detected", len(repo))
     113                logger.warning(f"FlexOID collision detected for {oid}")
     114
     115                # (A) refresh date
     116                date_part = datetime.now(timezone.utc).strftime("%y%m%d")
     117
     118                # (B) add minimal deterministic salt (2 hex chars)
     119                salt = secrets.token_hex(1)
     120                salted_text = f"{text}|salt:{salt}"
     121
     122                # (C) generate new OID with new date and salted seed
     123                oid = FlexOID.generate(
     124                    domain_code,
     125                    etype,
     126                    estate,
     127                    salted_text,
     128                    version=version,
     129                )
     130
     131                # (D) record lineage if the caller has `origin` tracking
     132                if hasattr(existing, "flexo_id"):
     133                    logger.info(f"New lineage created from {existing.flexo_id}")
     134
     135        return oid
     136   
    96137    @staticmethod
    97138    def generate(domain: str, etype: str, estate: str, text: str,
    98              version: int = 1, enforce_unique=True):
     139             version: int = 1):
    99140        """
    100141        Generate a deterministic Flex-O ID.
     
    102143        - The hash (and therefore prefix) depends only on domain, etype, and text.
    103144        → Prefix stays stable across state changes.
    104         - The signature still includes the state for audit integrity.
    105145        """
    106146
     
    117157        flexo_id_str = f"{domain}-{etype}{date_part}-{base_hash}@{ver_part}"
    118158
    119         # state-dependent signature → per-state integrity
    120         sig_seed = f"{hash_seed}:{estate}"
    121         signature = hashlib.blake2s(sig_seed.encode("utf-8"), digest_size=8).hexdigest().upper()
    122 
    123         return FlexOID(flexo_id_str, signature)
     159        return FlexOID(flexo_id_str)
    124160
    125161    # ──────────────────────────────────────────────────────────────────────────
     
    213249            "version": self.version,
    214250            "state": self.state_code,
    215             "signature": self.signature,
    216251        }
    217252    # ──────────────────────────────────────────────────────────────────────────
     
    223258
    224259        Increments the version counter of an existing FlexOID while preserving
    225         its prefix and digital signature. Used when an entity transitions to a
     260        its prefix. Used when an entity transitions to a
    226261        new revision within the same lifecycle (e.g., minor updates or approvals).
    227262
     
    234269        -------
    235270        FlexOID
    236         A new Flex-O ID with the same prefix and signature, but version +1.
     271        A new Flex-O ID with the same prefix, but version +1.
    237272
    238273        Raises
     
    243278        Notes
    244279        -----
    245         - The signature remains unchanged since the entity lineage is continuous.
    246280        - Warnings are logged when the version approaches obsolescence.
    247281        """
     
    254288
    255289        new_id = f"{oid.prefix}@{new_ver:03d}{oid.state_code}"
    256         return cls(new_id, oid.signature)
     290        return cls(new_id)
    257291
    258292    # ──────────────────────────────────────────────────────────────────────────
     
    282316
    283317    def __repr__(self):
    284         return f"<FlexOID {self.flexo_id} sig={self.signature[:8]}…>"
    285 
     318        return f"<FlexOID {self.flexo_id}>"
     319
  • tests/conftest.py

    r6a7dec1 r02d288d  
    55from flexoentity import FlexoEntity, EntityType, EntityState, Domain
    66
     7import pytest
     8import json
     9from flexoentity import EntityType, EntityState, Domain
     10from builder.questions import RadioQuestion, AnswerOption  # adjust path if different
     11from builder.media_items import NullMediaItem  # adjust import if needed
    712
    8 class DummyEntity(FlexoEntity):
    9     """Minimal concrete subclass for testing FlexoEntity logic."""
    1013
    11     def __init__(self, domain, etype, state, seed="DUMMY"):
    12         self._seed = seed
    13         super().__init__(domain, etype, state)
     14@pytest.fixture(scope="session")
     15def domain():
     16    """Provide a reusable domain for all entity tests."""
     17    return Domain(
     18        domain="SIG",
     19        etype=EntityType.DOMAIN,
     20        state=EntityState.DRAFT,
     21        fullname="Signal Corps",
     22        description="Questions related to communications and signaling systems.",
     23        classification="RESTRICTED",
     24        owner="test-suite"
     25    )
    1426
    15     @property
    16     def text_seed(self) -> str:
    17         return self._seed
    1827
    19     @classmethod
    20     def from_dict(cls, data):
    21         """Ensure enums and seed are reconstructed correctly."""
    22         domain = data["domain"]
    23         etype = EntityType[data["etype"]] if isinstance(data["etype"], str) else data["etype"]
    24         state = EntityState[data["state"]] if isinstance(data["state"], str) else data["state"]
    25         seed = data.get("text_seed", "DUMMY-CONTENT")
    26         return cls(domain=domain, etype=etype, state=state, seed=seed)
     28@pytest.fixture
     29def radio_question(domain):
     30    """Return a simple RadioQuestion entity for testing FlexoEntity logic."""
     31    q = RadioQuestion(
     32        domain=domain,
     33        etype=EntityType.QUESTION,
     34        state=EntityState.DRAFT,
     35        text="Which frequency band is used for shortwave communication?",
     36        options=[
     37            AnswerOption(id="opt1", text="HF (3–30 MHz)", points=1),
     38            AnswerOption(id="opt2", text="VHF (30–300 MHz)", points=0),
     39            AnswerOption(id="opt3", text="UHF (300–3000 MHz)", points=0),
     40        ]
     41    )
     42    return q
    2743
    28     @classmethod
    29     def from_json(cls, data_str: str):
    30         return cls.from_dict(json.loads(data_str))
    31    
     44
    3245@pytest.fixture
    33 def entity():
    34     """Generic FlexoEntity-like instance in draft state."""
    35     return DummyEntity(
    36         domain=Domain(domain="SIG", etype=EntityType.DOMAIN, state=EntityState.DRAFT, fullname="Signal Corps", classification="RESTRICTED"),
    37         etype=EntityType.CATALOG,
    38         state=EntityState.DRAFT,
    39     )
     46def serialized_question(radio_question):
     47    """Provide the serialized JSON form for roundtrip tests."""
     48    return radio_question.to_json()
     49
     50
     51@pytest.fixture
     52def deserialized_question(serialized_question):
     53    """Recreate a question from JSON for consistency tests."""
     54    return RadioQuestion.from_json(serialized_question)
     55
    4056
    4157@pytest.fixture
    4258def null_media():
    43     """Provide a default NullMediaItem instance for tests."""
     59    """Provide a default NullMediaItem instance for media tests."""
    4460    return NullMediaItem(
    45         domain="GEN",
     61        domain=domain,
    4662        etype=EntityType.MEDIA,
    4763        state=EntityState.DRAFT
  • tests/test_id_lifecycle.py

    r6a7dec1 r02d288d  
    11import pytest
    2 
    3 from flexoentity import FlexOID, FlexoEntity, EntityType, EntityState
    4 
    5 def test_initial_state(entity):
    6     assert entity.state == EntityState.DRAFT
    7     assert entity.flexo_id.version == 1
    8     assert len(entity.flexo_id.signature) == 16  # blake2s digest_size=8 → 16 hex
    9     assert FlexoEntity.verify_integrity(entity)
     2from flexoentity import FlexOID, FlexoEntity, EntityType, EntityState
    103
    114
    12 def test_approval_bumps_version(entity):
    13     entity.approve()
    14     assert entity.state == EntityState.APPROVED
    15     assert entity.flexo_id.version == 2
     5# ──────────────────────────────────────────────────────────────────────────────
     6# Tests adapted to use real RadioQuestion fixture instead of DummyEntity
     7# ──────────────────────────────────────────────────────────────────────────────
     8
     9def test_initial_state(radio_question):
     10    q = radio_question
     11    assert q.state == EntityState.DRAFT
     12    assert q.flexo_id.version == 1
     13    assert FlexoEntity.verify_integrity(q)
    1614
    1715
    18 def test_signing_bumps_version(entity):
    19     entity.approve()
    20     v_before = entity.flexo_id
    21     entity.sign()
    22     assert entity.state == EntityState.APPROVED_AND_SIGNED
    23     assert entity.flexo_id != v_before
     16def test_approval_bumps_version(radio_question):
     17    q = radio_question
     18    q.approve()
     19    assert q.state == EntityState.APPROVED
     20    assert q.flexo_id.version == 2
    2421
    2522
    26 def test_publish_bumps_version(entity):
    27     entity.approve()
    28     entity.sign()
    29     v_before = entity.flexo_id.version
    30     entity.publish()
    31     assert entity.state == EntityState.PUBLISHED
    32     assert entity.flexo_id.version == v_before + 1
     23def test_signing_bumps_version(radio_question):
     24    q = radio_question
     25    q.approve()
     26    v_before = str(q.flexo_id)
     27    q.sign()
     28    assert q.state == EntityState.APPROVED_AND_SIGNED
     29    assert str(q.flexo_id) != v_before
    3330
    3431
    35 def test_modify_content_changes_fingerprint(entity):
    36     old_signature = entity.flexo_id.signature
    37     entity._seed = "Rephrased content"  # simulate text change
    38     entity._update_fingerprint()
    39     assert entity.flexo_id.signature != old_signature
     32def test_publish_bumps_version(radio_question):
     33    q = radio_question
     34    q.approve()
     35    q.sign()
     36    v_before = q.flexo_id.version
     37    q.publish()
     38    assert q.state == EntityState.PUBLISHED
     39    assert q.flexo_id.version == v_before + 1
    4040
    4141
    42 def test_no_version_bump_on_draft_edits(entity):
    43     entity._seed = "Draft edit only"
    44     entity._update_fingerprint()
    45     assert entity.flexo_id.version == 1
     42def test_modify_content_changes_fingerprint(radio_question):
     43    q = radio_question
     44    q.text = "Rephrased content"  # simulate text change
     45    changed = q._update_fingerprint()
     46    assert changed
    4647
    4748
    48 def test_version_bump_after_edit_and_sign(entity):
    49     entity.approve()
    50     v1 = entity.flexo_id
    51     entity._seed = "Changed content"
    52     entity.sign()
    53     assert entity.flexo_id != v1
     49def test_no_version_bump_on_draft_edits(radio_question):
     50    q = radio_question
     51    q.text = "Minor draft edit"
     52    q._update_fingerprint()
     53    assert q.flexo_id.version == 1
    5454
    5555
    56 def test_integrity_check_passes_and_fails(entity):
    57     entity.approve()
    58     assert FlexoEntity.verify_integrity(entity)
    59     # simulate tampering
    60     entity._seed = "Tampered text"
    61     assert not FlexoEntity.verify_integrity(entity)
     56def test_version_bump_after_edit_and_sign(radio_question):
     57    q = radio_question
     58    q.approve()
     59    v1 = str(q.flexo_id)
     60    q.text = "Changed content"
     61    q.sign()
     62    assert str(q.flexo_id) != v1
    6263
    6364
    64 def test_obsolete_state(entity):
    65     entity.approve()
    66     entity.sign()
    67     entity.publish()
    68     entity.obsolete()
    69     assert entity.state == EntityState.OBSOLETE
     65def test_integrity_check_passes_and_fails(radio_question):
     66    q = radio_question
     67    q.approve()
     68    assert FlexoEntity.verify_integrity(q)
     69
     70    # simulate tampering
     71    q.text = "Tampered text"
     72    assert not FlexoEntity.verify_integrity(q)
    7073
    7174
    72 def test_clone_new_base_resets_lineage(entity):
    73     entity.approve()
    74     entity.sign()
    75     entity.publish()
    76     entity.obsolete()
    77     old_id = entity.flexo_id
    78     entity.clone_new_base()
    79     assert entity.flexo_id != old_id
    80     assert entity.state == EntityState.DRAFT
    81     assert entity.flexo_id.version == 1
     75def test_obsolete_state(radio_question):
     76    q = radio_question
     77    q.approve()
     78    q.sign()
     79    q.publish()
     80    q.obsolete()
     81    assert q.state == EntityState.OBSOLETE
    8282
    8383
    84 def test_mass_version_increments_until_obsolete(entity):
    85     entity.approve()
     84def test_clone_new_base_resets_lineage(radio_question):
     85    q = radio_question
     86    q.approve()
     87    q.sign()
     88    q.publish()
     89    q.obsolete()
     90    old_id = str(q.flexo_id)
     91    q.clone_new_base()
     92    assert str(q.flexo_id) != old_id
     93    assert q.state == EntityState.DRAFT
     94    assert q.flexo_id.version == 1
     95
     96
     97def test_mass_version_increments_until_obsolete(radio_question):
     98    q = radio_question
     99    q.approve()
    86100    for _ in range(FlexOID.MAX_VERSION - 2):
    87         entity.sign()
     101        q.sign()
    88102    with pytest.raises(RuntimeError, match="mark obsolete"):
    89         entity.sign()
     103        q.sign()
  • tests/test_id_stress.py

    r6a7dec1 r02d288d  
    33Focus: collision avoidance, version ceiling, reproducibility.
    44"""
     5
    56import pytest
    67import random
     8import logging
     9from flexoentity import FlexOID, EntityType, EntityState
     10from builder.questions import RadioQuestion, AnswerOption
    711
    8 from flexoentity import FlexOID, EntityType, EntityState, Domain
     12logger = logging.getLogger(__name__)
    913
    10 from tests.conftest import DummyEntity
    11 
    12 # ──────────────────────────────────────────────────────────────────────────────
    13 def test_bulk_generation_uniqueness():
    14     """Generate 10,000 IDs and assert uniqueness (statistical test)."""
    15     domain = Domain(domain="SIG", etype=EntityType.DOMAIN, state=EntityState.DRAFT,
    16                     fullname="Signal Corps", classification="RESTRICTED", owner="MESE")
    17 
     14def test_bulk_generation_uniqueness(domain):
     15    """
     16    Generate 10,000 IDs and ensure uniqueness using safe_generate().
     17    If a collision occurs, safe_generate() must resolve it automatically
     18    via salt + date adjustment.
     19    """
    1820    etype = EntityType.QUESTION
    1921    estate = EntityState.DRAFT
    20     seeds = [f"question {i}" for i in range(10_000)]
     22    seeds = [f"question {i}" for i in range(4000000)]
    2123
    22     ids = [FlexOID.generate(domain, etype, estate, seed) for seed in seeds]
     24    # Simulate a simple in-memory repository for collision detection
     25    repo = {}
    2326
    24     assert len(ids) == len(set(ids)), "ID collisions detected in bulk generation"
     27    def repo_get(oid_str):
     28        return repo.get(str(oid_str))
    2529
     30    # Generate IDs using safe_generate
     31    ids = []
     32    for seed in seeds:
     33        oid = FlexOID.safe_generate(domain.domain, etype, estate, seed, repo=repo)
     34        assert isinstance(oid, FlexOID)
     35        ids.append(str(oid))
     36        repo[str(oid)] = oid  # register for future collision detection
    2637
    27 def test_disambiguator_trigger():
     38    unique_count = len(set(ids))
     39    total_count = len(ids)
     40    collisions = total_count - unique_count
     41
     42    logger.info(f"Generated {total_count} IDs ({collisions} collisions handled).")
     43
     44    # Assert that safe_generate avoided duplicates
     45    assert total_count == unique_count, f"Unexpected duplicate IDs ({collisions} found)"
     46
     47    # Sanity check: IDs should look canonical
     48    assert all(id_str.startswith("SIG-") for id_str in ids)
     49    assert all("@" in id_str for id_str in ids)
     50
     51def test_id_generation_is_deterministic(domain):
    2852    """
    2953    Generating the same entity twice with same inputs yields identical ID.
    3054    (No runtime disambiguation; IDs are deterministic by design.)
    3155    """
    32     domain = "AF"
    3356    etype = EntityType.QUESTION
    3457    estate = EntityState.DRAFT
    3558    text = "identical question text"
    36     id1 = FlexOID.generate(domain, etype, estate, text)
    37     id2 = FlexOID.generate(domain, etype, estate, text)
    38     # IDs must be identical, because we now enforce determinism, not randomization
     59
     60    id1 = FlexOID.generate(domain.domain, etype, estate, text)
     61    id2 = FlexOID.generate(domain.domain, etype, estate, text)
     62    # IDs must be identical because generation is deterministic
    3963    assert id1 == id2
    40     assert id1.signature == id2.signature
    4164
    4265
    43 def test_id_reproducibility_across_runs():
     66def test_id_reproducibility_across_runs(domain):
    4467    """
    4568    The same seed on a new process (fresh _seen_hashes)
    4669    should yield the same base ID (without suffix).
    4770    """
    48     domain = Domain(domain="SIG", etype=EntityType.DOMAIN, state=EntityState.DRAFT,
    49                     fullname="Signal Corps", classification="RESTRICTED")
    5071    etype = EntityType.CATALOG
    5172    estate = EntityState.DRAFT
    5273    seed = "reproducibility test seed"
    53     id1 = FlexOID.generate(domain, etype, estate, seed)
    54     # Reset hash cache
     74
     75    id1 = FlexOID.generate(domain.domain, etype, estate, seed)
    5576    FlexOID._seen_hashes.clear()
    56     id2 = FlexOID.generate(domain, etype, estate, seed)
     77    id2 = FlexOID.generate(domain.domain, etype, estate, seed)
     78
    5779    assert id1 == id2
    58     assert id1.signature == id2.signature
    5980
    6081
    61 def test_version_ceiling_enforcement():
     82def test_version_ceiling_enforcement(radio_question):
    6283    """Simulate approaching @999 to trigger obsolescence guard."""
    63     entity = DummyEntity(domain="AF", etype=EntityType.EXAM, state=EntityState.DRAFT, seed="Final Exam 2025")
    64     entity.approve()
     84    q = radio_question
     85    q.approve()
     86
    6587    # artificially bump version number to near ceiling
    66     entity.flexo_id = FlexOID.from_oid_and_version(entity.flexo_id, 998)
     88    q.flexo_id = FlexOID.from_oid_and_version(q.flexo_id, 998)
    6789
    6890    # 998 → 999 is allowed
    69     entity.sign()
    70     assert entity.flexo_id.version == 999
     91    q.sign()
     92    assert q.flexo_id.version == 999
    7193
    7294    # 999 → 1000 should raise RuntimeError
    7395    with pytest.raises(RuntimeError):
    74         entity.sign()
     96        q.sign()
    7597
    7698
    77 def test_massive_lifecycle_simulation():
     99def test_massive_lifecycle_simulation(domain):
    78100    """
    79     Generate 100 random entities, simulate multiple edits and state transitions,
     101    Generate 100 random RadioQuestions, simulate multiple edits and state transitions,
    80102    ensure all final IDs and fingerprints are unique and valid.
    81103    """
    82     entities = [DummyEntity(domain="AF", etype=EntityType.QUESTION, state=EntityState.DRAFT, seed=f"random question {i}") for i in range(100)]
     104    entities = [
     105        RadioQuestion(
     106            domain=domain,
     107            etype=EntityType.QUESTION,
     108            state=EntityState.DRAFT,
     109            text=f"random question {i}",
     110            options=[
     111                AnswerOption(id="opt4", text="HF (3–30 MHz)", points=1),
     112                AnswerOption(id="opt5", text="VHF (30–300 MHz)", points=0),
     113            ],
     114        )
     115        for i in range(100)
     116    ]
    83117
    84118    for e in entities:
    85         # random edit, approval, signing
    86         e._seed += " updated"
     119        # random edit
     120        e.text += " updated"
    87121        e._update_fingerprint()
     122
     123        # lifecycle transitions
    88124        e.approve()
    89125        if random.random() > 0.3:
     
    92128            e.publish()
    93129
    94     ids = [e.flexo_id for e in entities]
    95     fps = [e.flexo_id.signature for e in entities]
    96     assert len(ids) == len(set(ids)), "Duplicate IDs after random lifecycle"
    97     assert len(fps) == len(set(fps)), "Duplicate fingerprints after random lifecycle"
     130    flexoids = [e.flexo_id for e in entities]
     131    assert len(flexoids) == len(set(flexoids)), "Duplicate FlexOIDs after lifecycle simulation"
  • tests/test_persistance_integrity.py

    r6a7dec1 r02d288d  
    66import pytest
    77
    8 from flexoentity import FlexOID, EntityType, EntityState
    9 from tests.conftest import DummyEntity
     8from flexoentity import EntityState
     9from builder.questions import RadioQuestion, AnswerOption
    1010
    1111
    1212# ──────────────────────────────────────────────────────────────────────────────
    1313@pytest.fixture
    14 def approved_entity():
    15     """A fully published dummy entity for persistence tests."""
    16     e = DummyEntity(
    17         domain="AF",
    18         etype=EntityType.QUESTION,
     14def approved_question(domain):
     15    """Provide a fully approved and published RadioQuestion for persistence tests."""
     16    q = RadioQuestion(
     17        domain=domain,
     18        etype=None,  # RadioQuestion sets this internally to EntityType.QUESTION
    1919        state=EntityState.DRAFT,
    20         seed="What is Ohm’s law?"
     20        text="What is Ohm’s law?",
     21        options=[
     22            AnswerOption(text="U = R × I", points=1),
     23            AnswerOption(text="U = I / R", points=0),
     24            AnswerOption(text="R = U × I", points=0),
     25        ],
    2126    )
    22     e.approve()
    23     e.sign()
    24     e.publish()
    25     return e
     27    q.approve()
     28    q.sign()
     29    q.publish()
     30    return q
    2631
    27 @pytest.mark.skip(reason="FlexOIDs are regenerated on import; enable once JSON format is stable")
    28 def test_json_roundtrip_preserves_integrity(approved_entity):
     32
     33@pytest.mark.skip(reason="FlexOIDs regenerated on import; enable once JSON format is stable")
     34def test_json_roundtrip_preserves_integrity(approved_question):
    2935    """
    30     Export to JSON and reload — ensure fingerprints remain valid.
     36    Export to JSON and reload — ensure fingerprints and signatures remain valid.
    3137    """
    32     json_str = approved_entity.to_json()
    33     loaded = approved_entity.__class__.from_json(json_str)
     38    json_str = approved_question.to_json()
     39    loaded = RadioQuestion.from_json(json_str)
    3440
    3541    # Fingerprint and state should match — integrity must pass
    36     assert approved_entity.__class__.verify_integrity(loaded)
     42    assert RadioQuestion.verify_integrity(loaded)
    3743
    3844    # Metadata should be preserved exactly
    39     assert approved_entity.flexo_id.signature == loaded.flexo_id.signature
    40     assert approved_entity.flexo_id == loaded.flexo_id
    41     assert loaded.state == approved_entity.state
     45    assert approved_question.signature == loaded.signature
     46    assert approved_question.flexo_id == loaded.flexo_id
     47    assert loaded.state == approved_question.state
     48
    4249
    4350# ──────────────────────────────────────────────────────────────────────────────
    4451
    45 @pytest.mark.skip(reason="FlexOIDs regenerated on import; tampering detection not applicable yet")
    46 def test_json_tampering_detection(approved_entity):
     52@pytest.mark.skip(reason="FlexOIDs regenerated on import; tampering detection not yet implemented")
     53def test_json_tampering_detection(approved_question):
    4754    """Tampering with content should invalidate fingerprint verification."""
    48     json_str = approved_entity.to_json()
    49     tampered_data = json.loads(json_str)
    50     tampered_data["text_seed"] = "Tampered content injection"
    51     tampered_json = json.dumps(tampered_data)
     55    json_str = approved_question.to_json()
     56    tampered = json.loads(json_str)
     57    tampered["text"] = "Tampered content injection"
     58    tampered_json = json.dumps(tampered)
    5259
    53     # We use DummyEntity.from_json to reconstruct (FlexoEntity is abstract)
    54     loaded = approved_entity.__class__.from_json(tampered_json)
    55     assert not approved_entity.__class__.verify_integrity(loaded)
     60    loaded = RadioQuestion.from_json(tampered_json)
     61    assert not RadioQuestion.verify_integrity(loaded)
    5662
    5763
     
    5965
    6066@pytest.mark.skip(reason="FlexOIDs regenerated on import; corruption detection not yet applicable")
    61 def test_json_file_corruption(approved_entity, tmp_path):
     67def test_json_file_corruption(approved_question, tmp_path):
    6268    """Simulate file corruption — integrity check must fail."""
    63     file = tmp_path / "entity.json"
    64     json_str = approved_entity.to_json()
     69    file = tmp_path / "question.json"
     70    json_str = approved_question.to_json()
    6571    file.write_text(json_str)
    6672
    67     # Corrupt the file
     73    # Corrupt the file (simulate accidental byte modification)
    6874    corrupted = json_str.replace("Ohm’s", "Omm’s")
    6975    file.write_text(corrupted)
    7076
    71     loaded = approved_entity.__class__.from_json(file.read_text())
    72     assert not approved_entity.__class__.verify_integrity(loaded)
     77    loaded = RadioQuestion.from_json(file.read_text())
     78    assert not RadioQuestion.verify_integrity(loaded)
Note: See TracChangeset for help on using the changeset viewer.