Changeset ef964d8 in flexoentity


Ignore:
Timestamp:
11/27/25 18:12:23 (7 weeks ago)
Author:
Enrico Schwass <ennoausberlin@…>
Branches:
master
Children:
4e11d58
Parents:
9a50e0b
Message:

new serialization structure adopted and tests fixed

Files:
9 edited

Legend:

Unmodified
Added
Removed
  • flexoentity/domain.py

    r9a50e0b ref964d8  
    1 from uuid import UUID
    21from dataclasses import dataclass
    3 from flexoentity import FlexOID, FlexoEntity, EntityType
     2from flexoentity import FlexoEntity, EntityType
     3
    44
    55@dataclass
     
    2626        return self.domain_id
    2727
     28    def _deserialize_content(self, content: dict):
     29        self.fullname = content.get("fullname", "")
     30        self.description = content.get("description", "")
     31        self.classification = content.get("classification", "UNCLASSIFIED")
     32
    2833    def _serialize_content(self):
    2934        return {
     
    3237            "classification": self.classification,
    3338        }
    34    
     39
    3540    @classmethod
    3641    def from_dict(cls, data):
    37         # Must have flexo_id
    38         if "flexo_id" not in data:
    39             raise ValueError("Domain serialization missing 'flexo_id'.")
    4042
    41         flexo_id = FlexOID(data["flexo_id"])
    42 
    43         obj = cls(
    44             fullname=data.get("fullname", ""),
    45             description=data.get("description", ""),
    46             classification=data.get("classification", "UNCLASSIFIED"),
    47             flexo_id=flexo_id,
    48             _in_factory=True
    49         )
    50 
    51         # Restore metadata
    52         obj.origin = data.get("origin")
    53         obj.fingerprint = data.get("fingerprint", "")
    54         obj.originator_id = (
    55             UUID(data["originator_id"]) if data.get("originator_id") else None
    56         )
    57         obj.owner_id = (
    58             UUID(data["owner_id"]) if data.get("owner_id") else None
    59         )
    60 
     43        obj = super().from_dict(data)
     44        content = data.get("content", {})
     45        obj._deserialize_content(content)
    6146        return obj
  • flexoentity/flexo_entity.py

    r9a50e0b ref964d8  
    6060
    6161
    62 from flexoentity.id_factory import FlexOID
    63 from flexoentity import canonical_seed
     62from flexoentity.id_factory import FlexOID, canonical_seed
     63from flexoentity.logger import logger
    6464
    6565
     
    170170        return copy
    171171
     172    @staticmethod
     173    def canonicalize_content_dict(data) -> str:
     174        """
     175        Canonicalize structured content for fingerprinting.
     176        - Stable JSON representation
     177        - Sorted keys
     178        - No whitespace noise
     179        """
     180        return json.dumps(data, sort_keys=True, separators=(",", ":"))
     181
    172182    @property
    173183    @abstractmethod
     
    190200        """I return the entity type derived from my FlexOID"""
    191201        return EntityType(self.flexo_id.entity_type)
    192 
    193     def canonical_seed(self) -> str:
    194         """
    195         I use a helper method to flatten my text_seed.
    196         NOTE:This might become superfluous when text_seed provides this by itself. Redesign possible
    197         """
    198         return canonical_seed(self.text_seed)
    199202
    200203    @classmethod
     
    241244        if not self.subtype:
    242245            self.subtype = self.__class__.__name__
    243            
     246
    244247    def __str__(self):
    245248        return (
     
    258261    }
    259262
    260     @abstractmethod
    261     def _serialize_content(self):
    262         return {}
    263 
    264263    def to_dict(self):
    265264        return {
     
    276275        - ALWAYS restores the canonical Domain via DomainManager.
    277276        """
    278         if "flexo_id" not in data["meta"]:
    279             raise ValueError("Serialized entity must include 'flexo_id'.")
    280 
    281         flexo_id = FlexOID(data["meta"]["flexo_id"])
    282         subtype = data["subtype"]
    283         # canonical domain object
     277        meta = data.get("meta", "")
     278        # if not meta or meta.get("flexo_id", ""):
     279        #    raise ValueError("Serialized entity must include 'flexo_id'.")
     280        flexo_id = FlexOID(meta.get("flexo_id"))
     281        subtype = meta.get("subtype")
     282        if not subtype:
     283            raise ValueError("Serialized entity must include 'subtype'.")
     284
     285        try:
     286            owner_id = UUID(meta.get("owner_id"))
     287        except ValueError as e:
     288            logger.warn(f"Missing or wrong owner_id {e}")
     289            owner_id = UUID(int=0)
     290        try:
     291            originator_id = UUID(meta.get("originator_id"))
     292        except ValueError as e:
     293            logger.warn(f"Missing or wrong originator_id {e}")
     294            originator_id = UUID(int=0)
    284295
    285296        obj = cls(
    286297            flexo_id=flexo_id,
    287298            subtype=subtype,
     299            origin=meta.get("origin", ""),
     300            fingerprint=meta.get("fingerprint", ""),
     301            owner_id=owner_id,
     302            originator_id=originator_id,
    288303            _in_factory=True
    289304        )
    290 
    291         # restore provenance
    292         obj.fingerprint = data.get("fingerprint", "")
    293         obj.origin = data.get("origin")
    294 
    295         if data.get("originator_id"):
    296             obj.originator_id = UUID(data["originator_id"])
    297 
    298         if data.get("owner_id"):
    299             obj.owner_id = UUID(data["owner_id"])
    300 
     305        obj._deserialize_content(data)
    301306        return obj
    302307
     
    311316        return cls.from_dict(data)
    312317
     318    def _deserialize_content(self, content_dict):
     319        """Subclasses override this to restore real fields."""
     320        pass  # default: do nothing
     321
     322
     323    def _serialize_content(self):
     324        """
     325        Subclasses override to return all integrity-relevant fields.
     326        Should include:
     327        - all actual structured data
     328        - comment (if meaningful)
     329        - signature_data / certificate info (if present)
     330        """
     331        return {}
     332
     333    def canonical_content(self) -> str:
     334        """Get canonical JSON string of content for fingerprinting."""
     335        return self.canonicalize_content_dict(self._serialize_content())
     336
    313337    def _compute_fingerprint(self) -> str:
    314338        """I recompute the entity's content fingerprint."""
    315         seed = self.canonical_seed()
    316         return hashlib.blake2s(seed.encode("utf-8"), digest_size=8).hexdigest().upper()
     339        return hashlib.blake2s(self.canonical_content().encode("utf-8"), digest_size=8).hexdigest().upper()
    317340
    318341    def _update_fingerprint(self) -> bool:
    319         """
    320         I update FlexOID if the content fingerprint changed.
    321         """
    322342        new_fp = self._compute_fingerprint()
    323         if new_fp != self.fingerprint:
    324             self.fingerprint = new_fp
    325             self.flexo_id = FlexOID.safe_generate(self.domain_id,
    326                                                   self.entity_type.value,
    327                                                   self.state.value,
    328                                                   self.text_seed,
    329                                                   self.flexo_id.version)
    330             return True
    331         return False
    332 
    333     def _transition(self, target_state: EntityState):
    334         """
    335         I am an internal helper for state transitions with version/fingerprint checks
    336         and forward-only enforcement.
    337         """
    338 
    339         if target_state not in self.allowed_transitions():
    340             raise ValueError(
    341                 f"Illegal state transition: {self.state.name} → {target_state.name}. "
    342             )
    343 
    344         # special case: marking obsolete
    345         if target_state == EntityState.OBSOLETE:
    346             self.flexo_id = self.flexo_id.with_state(EntityState.OBSOLETE.value)
    347             return
    348 
    349         self.state = target_state
     343        changed = new_fp != self.fingerprint
     344        self.fingerprint = new_fp
     345        return changed
    350346
    351347    # ───────────────────────────────────────────────────────────────
     
    380376        """
    381377        if self.state == EntityState.DRAFT:
    382             new_fid = FlexOID.safe_generate(self.domain_id,
    383                                             self.entity_type.value,
    384                                             EntityState.APPROVED.value,
    385                                             self.text_seed,
     378            new_fid = FlexOID.safe_generate(domain_id=self.domain_id,
     379                                            entity_type=self.entity_type.value,
     380                                            state=EntityState.APPROVED.value,
     381                                            text=self.text_seed,
    386382                                            version=self.version
    387383                                            )
     
    401397
    402398        # Change only the trailing state letter (A → S)
     399        old_id = self.flexo_id
    403400        self.flexo_id = self.flexo_id.with_state(EntityState.APPROVED_AND_SIGNED.value)
     401        self.flexo_id.origin = old_id
    404402        return self
    405403
     
    417415
    418416        new_fid = FlexOID.safe_generate(
    419             self.domain_id,
    420             self.entity_type.value,
    421             EntityState.PUBLISHED.value,
    422             self.text_seed,
     417            domain_id=self.domain_id,
     418            entity_type=self.entity_type.value,
     419            state=EntityState.PUBLISHED.value,
     420            text=self.text_seed,
    423421            version=self.version
    424422        )
     
    435433            )
    436434        if self.state != EntityState.OBSOLETE:
    437             self._transition(EntityState.OBSOLETE)
     435            new_fid = FlexOID.safe_generate(
     436                domain_id=self.domain_id,
     437                entity_type=self.entity_type.value,
     438                state=EntityState.OBSOLETE.value,
     439                text=self.text_seed,
     440                version=self.version
     441            )
     442
     443        self.origin = self.flexo_id
     444        self.flexo_id = new_fid
     445
    438446        return self
    439447
     
    442450        self.origin = str(self.flexo_id)
    443451        self.flexo_id = FlexOID.clone_new_base(
    444             self.domain_id,
    445             self.entity_type.value,
    446             EntityState.DRAFT.value,
    447             self.text_seed,
     452            domain_id=self.domain_id,
     453            entity_type=self.entity_type.value,
     454            state=EntityState.DRAFT.value,
     455            text=self.text_seed,
    448456        )
    449457        return self
     
    459467        Returns False if:
    460468          - flexo_id format is invalid
    461           - derived state/type mismatch the ID
    462           - fingerprint doesn't match the canonical text seed
    463469        """
    464470        try:
    465471            if not isinstance(entity.flexo_id, FlexOID):
    466                 return False
    467 
    468             # Validate domain and ID coherence
    469             if entity.domain_id != entity.flexo_id.domain_id:
    470                 return False
    471             if entity.entity_type.value != entity.flexo_id.entity_type:
    472                 return False
    473             if entity.state.value != entity.flexo_id.state_code:
    474472                return False
    475473
  • flexoentity/flexo_signature.py

    r9a50e0b ref964d8  
    44from uuid import UUID
    55from flexoentity import FlexoEntity, FlexOID, EntityType
     6
    67
    78@dataclass
     
    4647    public_cert_path: Optional[str] = None
    4748
     49    def to_dict(self):
     50        return {
     51            "platform": self.platform,
     52            "identifier": self.identifier,
     53            "private_key_path": self.private_key_path,
     54            "public_cert_path": self.public_cert_path
     55        }
     56
     57
    4858@dataclass
    4959class FlexoSignature(FlexoEntity):
     
    7585    def _serialize_content(self):
    7686        return {
    77             "signed_entity": self.signed_entity,
     87            "signed_entity": str(self.signed_entity),
    7888            "signer_id": str(self.signer_id),
    7989            "signature_data": self.signature_data,
    80             "certificate_reference": self.certificate_reference.to_dict(),
    81             "certificate_thumbprint": self.certificate_thumbprint
     90            "signature_type": self.signature_type,
     91            # "certificate_reference": self.certificate_reference.to_dict(),
     92            "certificate_thumbprint": self.certificate_thumbprint,
     93            "comment": self.comment
    8294        }
    8395
  • flexoentity/id_factory.py

    r9a50e0b ref964d8  
    7878from flexoentity import logger
    7979
    80 def canonical_seed(obj) -> str:
    81     """
    82     I transform *obj* into a deterministic, comparable text form.
    83 
    84     I remove irrelevant formatting differences so that two equal
    85     pieces of data always yield the same hash seed.
    86 
    87     Rules:
    88     - If *obj* is a string, I normalize whitespace.
    89     - If *obj* is a dict, I JSON-encode it with sorted keys.
    90     - If *obj* is an object, I recurse on its __dict__.
    91     - Otherwise, I coerce *obj* to str().
    92     """
    93 
    94     if isinstance(obj, str):
    95         text = " ".join(obj.split())
    96         return text
    97     if isinstance(obj, dict):
    98         return json.dumps(obj, sort_keys=True, separators=(",", ":"))
    99     if hasattr(obj, "__dict__"):
    100         return canonical_seed(obj.__dict__)
    101     return str(obj)
    102 
     80def canonical_seed(text: str) -> str:
     81    """Canonicalize identity-only text_seed."""
     82    if not text:
     83        return ""
     84    # remove control characters
     85    s = re.sub(r"[\t\r\n]+", " ", text)
     86    # collapse multiple spaces
     87    s = re.sub(r"\s+", " ", s)
     88    return s.strip()
    10389
    10490class FlexOID(str):
     
    346332
    347333    @staticmethod
    348     def clone_new_base(domain: str, entity_type: str, state: str, text: str) -> "FlexOID":
     334    def clone_new_base(domain_id: str, entity_type: str, state: str, text: str) -> "FlexOID":
    349335        """
    350336        I start a completely new lineage (version 1) for a derived entity.
     
    352338        not share version history with its origin.
    353339        """
    354         return FlexOID.safe_generate(domain, entity_type, state, text, version=1)
     340        return FlexOID.safe_generate(domain_id, entity_type, state, text, version=1)
    355341
    356342    def to_dict(self) -> dict:
  • tests/conftest.py

    r9a50e0b ref964d8  
    1 # tests/stubs/single_choice_question.py
    2 from dataclasses import dataclass, field
    31import pytest
    42import platform
    53from pathlib import Path
    64from datetime import datetime
    7 from typing import List
    8 from flexoentity import FlexOID, FlexoEntity, EntityType, EntityState, Domain, get_signing_backend, CertificateReference
     5from flexoentity import Domain, FlexoSignature
     6from flexoentity import get_signing_backend, CertificateReference
    97
    108
     
    1816    return FixedDate
    1917
    20 @dataclass
    21 class AnswerOption:
    22     id: str
    23     text: str
    24     points: float = 0.0
    25 
    26     def to_dict(self):
    27         return {"id": self.id, "text": self.text, "points": self.points}
    28 
    29     @classmethod
    30     def from_dict(cls, data):
    31         return cls(
    32             id=data.get("id", ""),
    33             text=data.get("text", ""),
    34             points=data.get("points", 0.0)
    35         )
    36 
    37 
    38 @dataclass
    39 class SingleChoiceQuestion(FlexoEntity):
    40     """A minimal stub to test FlexoEntity integration."""
    41     ENTITY_TYPE = EntityType.ITEM
    42 
    43     text: str = ""
    44     options: List[AnswerOption] = field(default_factory=list)
    45 
    46     def __post_init__(self):
    47         # If no FlexOID yet, generate a draft ID now.
    48         if not getattr(self, "flexo_id", None):
    49             self.flexo_id = FlexOID.safe_generate(
    50                 domain_id=self.domain_id,
    51                 entity_type=SingleChoiceQuestion.ENTITY_TYPE.value,     # 'I'
    52                 state=EntityState.DRAFT.value,        # 'D'
    53                 text=self.text_seed or self.text,
    54                 version=1,
    55             )
    56 
    57     @classmethod
    58     def default(cls):
    59         return cls()
    60 
    61     def _serialize_content(self):
    62         return {
    63             "text": self.text,
    64             "options": [opt.to_dict() for opt in self.options],
    65         }
    66 
    67     @property
    68     def text_seed(self) -> str:
    69         """Include answer options (and points) for deterministic ID generation."""
    70 
    71         joined = "|".join(
    72             f"{opt.text.strip()}:{opt.points}"
    73             for opt in sorted(self.options, key=lambda o: o.text.strip().lower())
    74         )
    75         return f"{self.text}{joined}"
    76 
    77     @classmethod
    78     def from_dict(cls, data):
    79         obj = cls(text=data.get("text", ""),
    80             options=[AnswerOption.from_dict(o) for o in data.get("options", [])],
    81         )
    82         # restore FlexoEntity core fields
    83         if "flexo_id" in data:
    84             obj.flexo_id = FlexOID.to_dict(data["flexo_id"])
    85         return obj
    8618
    8719@pytest.fixture
     
    9224                                 description="ALL ABOUT ARITHMETIC IN PYTHON")
    9325
    94 @pytest.fixture
    95 def sample_question(sample_domain):
    96     q = SingleChoiceQuestion.with_domain_id(domain_id=sample_domain.domain_id,
    97                                             text="What is 2 + 2?",
    98                                             options=[])
    99     q._update_fingerprint()
    100     return q
    10126
    10227SYSTEM = platform.system()
     
    181106    except Exception as e:
    182107        pytest.skip(f"Backend unavailable or misconfigured: {e}")
     108
     109@pytest.fixture
     110def sample_signature(sample_domain, cert_ref_linux):
     111    return FlexoSignature.with_domain_id(domain_id="SIG", signed_entity=sample_domain,
     112                                         certificate_reference=cert_ref_linux,
     113                                         comment="This is a mock signature")
     114
  • tests/test_flexoid.py

    r9a50e0b ref964d8  
    111111# ──────────────────────────────────────────────
    112112
    113 def test_canonical_seed_for_strings_and_dicts():
     113def test_canonical_seed_for_strings():
    114114    s1 = "  Hello   world "
    115115    s2 = "Hello world"
    116116    assert canonical_seed(s1) == canonical_seed(s2)
    117 
    118     d1 = {"b": 1, "a": 2}
    119     d2 = {"a": 2, "b": 1}
    120     assert canonical_seed(d1) == canonical_seed(d2)
    121 
    122     class Dummy:
    123         def __init__(self):
    124             self.x = 1
    125             self.y = 2
    126     obj = Dummy()
    127     assert isinstance(canonical_seed(obj), str)
    128 
    129117
    130118# ──────────────────────────────────────────────
  • tests/test_id_lifecycle.py

    r9a50e0b ref964d8  
    33
    44
    5 # ──────────────────────────────────────────────────────────────────────────────
    6 # Tests adapted to use real SingleChoiceQuestion fixture instead of DummyEntity
    7 # ──────────────────────────────────────────────────────────────────────────────
     5def test_initial_state(sample_domain):
     6    assert sample_domain.state == EntityState.DRAFT
     7    assert sample_domain.flexo_id.version == 1
     8    assert FlexoEntity.verify_integrity(sample_domain)
    89
    9 def test_initial_state(sample_question):
    10     q = sample_question
    11     assert q.state == EntityState.DRAFT
    12     assert q.flexo_id.version == 1
    13     assert FlexoEntity.verify_integrity(q)
    14 
    15 def test_approval_does_not_bump_version(sample_question):
    16     q = sample_question
     10def test_approval_does_not_bump_version(sample_domain):
     11    q = sample_domain
    1712    q.approve()
    1813    assert q.state == EntityState.APPROVED
    1914    assert q.flexo_id.version == 1
    2015
    21 def test_signing_does_not_bump_version(sample_question):
    22     q = sample_question
     16def test_signing_does_not_bump_version(sample_domain):
     17    q = sample_domain
    2318    q.approve()
    2419    before = q.flexo_id
     
    3833
    3934
    40 def test_publish_does_not_bump_version(sample_question):
    41     q = sample_question
     35def test_publish_does_not_bump_version(sample_domain):
     36    q = sample_domain
    4237    q.approve()
    4338    q.sign()
     
    4843
    4944
    50 def test_modify_content_changes_fingerprint(sample_question):
    51     q = sample_question
    52     q.text += "Rephrased content"  # simulate text change
    53     changed = q._update_fingerprint()
     45def test_modify_content_changes_fingerprint(sample_signature):
     46    sample_signature.comment += "Rephrased content"  # simulate text change
     47    changed = sample_signature._update_fingerprint()
    5448    assert changed
    5549
    5650
    57 def test_no_version_bump_on_draft_edits(sample_question):
    58     q = sample_question
    59     q.text = "Minor draft edit"
    60     q._update_fingerprint()
    61     assert q.flexo_id.version == 1
     51def test_no_version_bump_on_draft_edits(sample_signature):
     52    sample_signature.comment = "Minor draft edit"
     53    sample_signature._update_fingerprint()
     54    assert sample_signature.flexo_id.version == 1
    6255
    6356
    64 def test_version_bump_after_edit_and_sign(sample_question):
    65     q = sample_question
    66     q.approve()
    67     v1 = str(q.flexo_id)
    68     q.text = "Changed content"
    69     q.sign()
    70     assert str(q.flexo_id) != v1
     57def test_version_bump_after_edit_and_sign(sample_signature):
     58    sample_signature.approve()
     59    v1 = str(sample_signature.flexo_id)
     60    sample_signature.comment = "Changed comment"
     61    sample_signature.sign()
     62    assert str(sample_signature.flexo_id) != v1
    7163
    7264
    73 def test_integrity_check_passes_and_fails(sample_question):
    74     q = sample_question
    75     q.approve()
    76     assert FlexoEntity.verify_integrity(q)
     65def test_integrity_check_passes_and_fails(sample_signature):
     66    sample_signature.approve()
     67    assert FlexoEntity.verify_integrity(sample_signature)
    7768
    7869    # simulate tampering
    79     q.text = "Tampered text"
    80     assert not FlexoEntity.verify_integrity(q)
     70    sample_signature.comment = "Tampered text"
     71    assert not FlexoEntity.verify_integrity(sample_signature)
    8172
    8273
    83 def test_obsolete_state(sample_question):
    84     q = sample_question
     74def test_obsolete_state(sample_domain):
     75    q = sample_domain
    8576    q.approve()
    8677    q.sign()
     
    9081
    9182
    92 def test_clone_new_base_resets_lineage(sample_question):
    93     q = sample_question
     83def test_clone_new_base_resets_lineage(sample_domain):
     84    q = sample_domain
    9485    q.approve()
    9586    q.sign()
     
    10293    assert q.flexo_id.version == 1
    10394
    104 def test_clone_new_base_sets_origin(sample_question):
    105     q = sample_question
     95def test_clone_new_base_sets_origin(sample_domain):
     96    q = sample_domain
    10697    q.approve()
    10798    q.sign()
     
    115106    assert q.flexo_id != old_id
    116107
    117 def test_mass_version_increments_until_obsolete(sample_question):
    118     q = sample_question
     108def test_mass_version_increments_until_obsolete(sample_domain):
     109    q = sample_domain
    119110    q.approve()
    120111    for _ in range(FlexOID.MAX_VERSION - 1):
  • tests/test_id_stress.py

    r9a50e0b ref964d8  
    99
    1010import pytest
    11 
    12 from flexoentity import FlexOID, EntityType, EntityState
     11from uuid import uuid4
     12from flexoentity import FlexOID, EntityType, EntityState, FlexoSignature
    1313
    1414logger = logging.getLogger(__name__)
     
    6666    assert id1 == id2
    6767
     68def test_massive_lifecycle_simulation(cert_ref_linux, sample_domain):
     69    """
     70    Generate 100 random FlexoSignatures, mutate content, run through lifecycle,
     71    and ensure all FlexOIDs are unique and valid.
     72    """
     73    entities = []
    6874
    69 def test_massive_lifecycle_simulation(sample_question):
    70     """
    71     Generate 100 random SingleChoiceQuestions, simulate multiple edits and state transitions,
    72     ensure all final IDs and fingerprints are unique and valid.
    73     """
    74     entities = [
    75         copy.deepcopy(sample_question) for _ in range(100)
    76     ]
     75    for i in range(100):
     76        sig = FlexoSignature.with_domain_id(
     77            domain_id="SIGTEST",
     78            signed_entity=sample_domain.flexo_id,
     79            signer_id=uuid4(),
     80            certificate_reference=cert_ref_linux,
     81            comment=f"Initial signature #{i}"
     82        )
     83        entities.append(sig)
    7784
     85    # Mutate + lifecycle transitions
    7886    for i, e in enumerate(entities):
    79         # random edit
    80         e.text += f" updated #{i}"
     87        # CONTENT CHANGE → fingerprint changes → hash → FlexOID.prefix changes
     88        e.comment += f" updated-{i}"
    8189        e._update_fingerprint()
    8290
     
    8896            e.publish()
    8997
    90     flexoids = [e.flexo_id for e in entities]
    91     assert len(flexoids) == len(set(flexoids)), "Duplicate FlexOIDs after lifecycle simulation"
     98    # Check ID uniqueness
     99    flexoids = [str(e.flexo_id) for e in entities]
     100    assert len(flexoids) == len(set(flexoids)), "Duplicate FlexOIDs detected"
  • tests/test_persistance_integrity.py

    r9a50e0b ref964d8  
    66import pytest
    77
    8 from flexoentity import EntityState, EntityType, Domain
     8from flexoentity import Domain
     9
    910
    1011@pytest.fixture
    11 def approved_question():
    12     """Provide a fully approved and published SingleChoiceQuestion for persistence tests."""
    13     q = SingleChoiceQuestion(
    14         domain=Domain(domain="GEN", entity_type=EntityType.DOMAIN, state=EntityState.DRAFT),
    15         entity_type=None,  # SingleChoiceQuestion sets this internally to EntityType.ITEM
    16         state=EntityState.DRAFT,
    17         text="What is Ohm’s law?",
    18         options=[
    19             AnswerOption(id="OP1", text="U = R × I", points=1),
    20             AnswerOption(id="OP2", text="U = I / R", points=0),
    21             AnswerOption(id="OP3", text="R = U × I", points=0),
    22         ],
    23     )
    24     q.approve()
    25     q.sign()
    26     q.publish()
    27     return q
     12def approved_domain(sample_domain):
     13    """Provide a fully approved and published Domain for persistence tests."""
     14    sample_domain.approve()
     15    sample_domain.sign()
     16    sample_domain.publish()
     17    return sample_domain
    2818
    2919
    30 @pytest.mark.skip(reason="FlexOIDs regenerated on import; enable once JSON format is stable")
    31 def test_json_roundtrip_preserves_integrity(approved_question):
     20def test_json_roundtrip_preserves_integrity(approved_domain):
    3221    """
    3322    Export to JSON and reload — ensure fingerprints and signatures remain valid.
    3423    """
    35     json_str = approved_question.to_json()
    36     loaded = SingleChoiceQuestion.from_json(json_str)
     24    json_str = approved_domain.to_json()
     25    loaded = Domain.from_json(json_str)
    3726
    3827    # Fingerprint and state should match — integrity must pass
    39     assert SingleChoiceQuestion.verify_integrity(loaded)
     28    assert Domain.verify_integrity(loaded)
    4029
    4130    # Metadata should be preserved exactly
    42     assert approved_question.fingerprint == loaded.fingerprint
    43     assert approved_question.flexo_id == loaded.flexo_id
    44     assert loaded.state == approved_question.state
     31    assert approved_domain.fingerprint == loaded.fingerprint
     32    assert approved_domain.flexo_id == loaded.flexo_id
     33    assert loaded.state == approved_domain.state
    4534
    46 @pytest.mark.skip(reason="FlexOIDs regenerated on import; tampering detection not yet implemented")
    47 def test_json_tampering_detection(approved_question):
     35
     36def test_json_tampering_detection(approved_domain):
    4837    """Tampering with content should invalidate fingerprint verification."""
    49     json_str = approved_question.to_json()
     38    json_str = approved_domain.to_json()
    5039    tampered = json.loads(json_str)
    51     tampered["text"] = "Tampered content injection"
     40    tampered["content"]["fullname"] = "Tampered content injection"
    5241    tampered_json = json.dumps(tampered)
    5342
    54     loaded = SingleChoiceQuestion.from_json(tampered_json)
    55     assert not SingleChoiceQuestion.verify_integrity(loaded)
     43    loaded = Domain.from_json(tampered_json)
     44    assert not Domain.verify_integrity(loaded)
    5645
    57 @pytest.mark.skip(reason="FlexOIDs regenerated on import; corruption detection not yet applicable")
    58 def test_json_file_corruption(approved_question, tmp_path):
     46
     47def test_json_file_corruption(approved_domain, tmp_path):
    5948    """Simulate file corruption — integrity check must fail."""
    6049    file = tmp_path / "question.json"
    61     json_str = approved_question.to_json()
     50    json_str = approved_domain.to_json()
     51    print("JSON", json_str)
    6252    file.write_text(json_str)
    6353
    6454    # Corrupt the file (simulate accidental byte modification)
    65     corrupted = json_str.replace("Ohm’s", "Omm’s")
     55    corrupted = json_str.replace("ARITHMETIC", "ARITHM")
    6656    file.write_text(corrupted)
    6757
    68     loaded = SingleChoiceQuestion.from_json(file.read_text())
    69     assert not SingleChoiceQuestion.verify_integrity(loaded)
     58    loaded = Domain.from_json(file.read_text())
     59    assert not Domain.verify_integrity(loaded)
Note: See TracChangeset for help on using the changeset viewer.