Changeset 753855a in flexoentity


Ignore:
Timestamp:
12/03/25 22:14:53 (6 weeks ago)
Author:
Enrico Schwass <ennoausberlin@…>
Branches:
master
Children:
ea28ca0
Parents:
4459fa4
Message:

add more backends and tests

Files:
4 added
4 edited

Legend:

Unmodified
Added
Removed
  • flexoentity/json_file_backend.py

    r4459fa4 r753855a  
    11import json
    22from .persistance_backend import PersistanceBackend
     3from .in_memory_backend import InMemoryBackend
    34
    45
    56class JsonFileBackend(PersistanceBackend):
    6     def __init__(self, path: str):
    7         self.path = path
    8         self.data = []
    9         self._load_file()
     7    """
     8    JSON-file based backend.
    109
    11     def _load_file(self):
    12         try:
    13             with open(self.path, "r", encoding="utf-8") as f:
    14                 self.data = json.load(f)
    15         except FileNotFoundError:
    16             self.data = []
     10    Uses an internal InMemoryBackend and syncs to a single JSON file on disk.
     11    """
    1712
    18     def save(self, entity_dict):
    19         self.data.append(entity_dict)
    20         self._flush()
     13    def __init__(self, entity_class, path):
     14        self._mem = InMemoryBackend(entity_class)
     15        self._path = path
    2116
    22     def update(self, entity_dict):
    23         fid = entity_dict["flexo_id"]
    24         self.data = [d for d in self.data if d["flexo_id"] != fid]
    25         self.data.append(entity_dict)
    26         self._flush()
     17    @property
     18    def entity_class(self):
     19        return self._mem.entity_class
    2720
    28     def delete(self, flexo_id):
    29         self.data = [d for d in self.data if d["flexo_id"] != flexo_id]
    30         self._flush()
     21    # core API just delegates to memory backend
    3122
    32     def load(self, flexo_id):
    33         for d in self.data:
    34             if d["flexo_id"] == flexo_id:
    35                 return d
    36         return None
     23    def save(self, entity):
     24        self._mem.save(entity)
     25
     26    def update(self, entity):
     27        self._mem.update(entity)
     28
     29    def delete(self, flexo_id: str):
     30        self._mem.delete(flexo_id)
     31
     32    def load(self, flexo_id: str):
     33        return self._mem.load(flexo_id)
    3734
    3835    def load_all(self):
    39         return self.data
     36        return self._mem.load_all()
    4037
    41     def _flush(self):
    42         with open(self.path, "w", encoding="utf-8") as f:
    43             json.dump(self.data, f, ensure_ascii=False, indent=2)
     38    def clear(self):
     39        self._mem.clear()
     40
     41    # file sync
     42
     43    def flush_to_file(self):
     44        data = [e.to_dict() for e in self._mem.load_all()]
     45        with open(self._path, "w", encoding="utf-8") as f:
     46            json.dump(data, f, ensure_ascii=False, indent=2)
     47
     48    def load_from_file(self):
     49        try:
     50            with open(self._path, "r", encoding="utf-8") as f:
     51                data = json.load(f)
     52        except FileNotFoundError:
     53            return
     54
     55        self._mem.clear()
     56        for d in data:
     57            self._mem.save(self.entity_class.from_dict(d))
  • flexoentity/sqlite_entity_backend.py

    r4459fa4 r753855a  
    1 import sqlite3
    21import json
    3 from typing import Optional, Iterable
    4 
    52from .persistance_backend import PersistanceBackend
    63
     
    85class SQLiteEntityBackend(PersistanceBackend):
    96    """
    10     Generic SQLite persistence layer for FlexOEntity subclasses.
    11    
    12     Subclasses must define:
    13       - TABLE_NAME        (e.g. "users")
    14       - ENTITY_CLASS      (e.g. FlexoUser)
    15       - INDEXED_FIELDS    (list of field names to index)
     7    SQLite backend storing **dicts**, not entities.
     8    Managers do the entity conversions.
    169    """
    1710
    18     TABLE_NAME = None
    19     ENTITY_CLASS = None
    20     INDEXED_FIELDS: list[str] = []
    21 
    22     def __init__(self, db_path: str):
    23         if self.TABLE_NAME is None:
    24             raise ValueError("Subclasses must define TABLE_NAME")
    25         if self.ENTITY_CLASS is None:
    26             raise ValueError("Subclasses must define ENTITY_CLASS")
    27 
    28         self.db_path = db_path
    29         self.conn = sqlite3.connect(db_path)
    30         self.conn.row_factory = sqlite3.Row
     11    def __init__(self, entity_class, conn, table_name):
     12        self.entity_class = entity_class
     13        self.conn = conn
     14        self.table = table_name
    3115        self._init_schema()
    3216
    33     # -----------------------------------------------------------
    34     # Schema
    35     # -----------------------------------------------------------
    3617    def _init_schema(self):
    37         # Base schema: store full entity JSON and the FlexOID
    38         columns = [
    39             "flexo_id TEXT PRIMARY KEY",
    40             "json TEXT NOT NULL",
    41         ]
    42 
    43         # Add indexed metadata columns
    44         for field in self.INDEXED_FIELDS:
    45             columns.append(f"{field} TEXT")
    46 
    47         colspec = ", ".join(columns)
    48 
    49         self.conn.execute(
    50             f"CREATE TABLE IF NOT EXISTS {self.TABLE_NAME} ({colspec})"
    51         )
    52 
    53         # Create indices on metadata fields
    54         for field in self.INDEXED_FIELDS:
    55             self.conn.execute(
    56                 f"CREATE INDEX IF NOT EXISTS idx_{self.TABLE_NAME}_{field} "
    57                 f"ON {self.TABLE_NAME}({field})"
     18        self.conn.execute(f"""
     19            CREATE TABLE IF NOT EXISTS {self.table} (
     20                flexo_id TEXT PRIMARY KEY,
     21                json     TEXT NOT NULL
    5822            )
    59 
     23        """)
    6024        self.conn.commit()
    6125
    62     # -----------------------------------------------------------
    63     # CRUD operations
    64     # -----------------------------------------------------------
    65     def add(self, obj):
    66         data = json.dumps(obj.to_dict(), ensure_ascii=False)
    67 
    68         # Collect row data dynamically
    69         fields = ["flexo_id", "json"] + self.INDEXED_FIELDS
    70         placeholders = ", ".join("?" for _ in fields)
    71         sql = (
    72             f"INSERT INTO {self.TABLE_NAME} ({', '.join(fields)}) "
    73             f"VALUES ({placeholders})"
    74         )
    75 
    76         params = [obj.flexo_id.value, data]
    77         params.extend(getattr(obj, f) for f in self.INDEXED_FIELDS)
    78 
    79         self.conn.execute(sql, params)
    80         self.conn.commit()
    81 
    82     def update(self, obj):
    83         data = json.dumps(obj.to_dict(), ensure_ascii=False)
    84         set_parts = ["json = ?"] + [f"{f} = ?" for f in self.INDEXED_FIELDS]
    85 
    86         sql = (
    87             f"UPDATE {self.TABLE_NAME} SET "
    88             f"{', '.join(set_parts)} "
    89             f"WHERE flexo_id = ?"
    90         )
    91 
    92         params = [data]
    93         params.extend(getattr(obj, f) for f in self.INDEXED_FIELDS)
    94         params.append(obj.flexo_id.value)
    95 
    96         self.conn.execute(sql, params)
    97         self.conn.commit()
    98 
    99     def delete(self, flexo_id: str):
     26    def save(self, entity):
     27        entity_dict = entity.to_dict()
     28        fid = entity_dict["meta"]["flexo_id"]
    10029        self.conn.execute(
    101             f"DELETE FROM {self.TABLE_NAME} WHERE flexo_id = ?",
    102             (flexo_id,),
     30            f"INSERT OR REPLACE INTO {self.table} (flexo_id, json) VALUES (?, ?)",
     31            (fid, json.dumps(entity_dict))
    10332        )
    10433        self.conn.commit()
    10534
    106     # -----------------------------------------------------------
    107     # Retrieval
    108     # -----------------------------------------------------------
    109     def get(self, flexo_id: str):
     35    def update(self, entity):
     36        entity_dict = entity.to_dict()
     37        fid = entity_dict["meta"]["flexo_id"]
     38        self.conn.execute(
     39            f"UPDATE {self.table} SET json = ? WHERE flexo_id = ?",
     40            (json.dumps(entity_dict), fid)
     41        )
     42        self.conn.commit()
     43
     44    def delete(self, flexo_id):
     45        self.conn.execute(
     46            f"DELETE FROM {self.table} WHERE flexo_id = ?",
     47            (flexo_id,)
     48        )
     49        self.conn.commit()
     50
     51    def load(self, flexo_id):
    11052        row = self.conn.execute(
    111             f"SELECT json FROM {self.TABLE_NAME} WHERE flexo_id = ?",
     53            f"SELECT json FROM {self.table} WHERE flexo_id = ?",
    11254            (flexo_id,),
    11355        ).fetchone()
     56
    11457        if not row:
    11558            return None
    116         return self.ENTITY_CLASS.from_dict(json.loads(row["json"]))
    11759
    118     def all(self):
     60        return self.entity_class.from_dict(json.loads(row["json"]))
     61
     62    def load_all(self):
    11963        rows = self.conn.execute(
    120             f"SELECT json FROM {self.TABLE_NAME}"
     64            f"SELECT json FROM {self.table}"
    12165        ).fetchall()
    122         return [
    123             self.ENTITY_CLASS.from_dict(json.loads(r["json"])) for r in rows
    124         ]
    12566
    126     # -----------------------------------------------------------
    127     # Dynamic finders: find_by_<field>
    128     # -----------------------------------------------------------
    129     def __getattr__(self, name):
    130         # e.g. find_by_username("foo")
    131         if name.startswith("find_by_"):
    132             field = name[len("find_by_"):]
    133             if field not in self.INDEXED_FIELDS:
    134                 raise AttributeError(f"No such indexed field: {field}")
     67        return [self.entity_class.from_dict(json.loads(r["json"])) for r in rows]
    13568
    136             def finder(value):
    137                 rows = self.conn.execute(
    138                     f"SELECT json FROM {self.TABLE_NAME} WHERE {field} = ?",
    139                     (value,),
    140                 )
    141                 for row in rows:
    142                     yield self.ENTITY_CLASS.from_dict(json.loads(row["json"]))
    143 
    144             return finder
    145 
    146         raise AttributeError(name)
     69    def clear(self):
     70        self.conn.execute(f"DELETE FROM {self.table}")
     71        self.conn.commit()
  • tests/conftest.py

    r4459fa4 r753855a  
    33from pathlib import Path
    44from datetime import datetime
    5 from flexoentity import Domain, FlexoSignature, DomainManager, EntityRegistry, CompositeBackend
     5from flexoentity import Domain, FlexoSignature, DomainManager, EntityRegistry, CompositeBackend, InMemoryBackend
    66from flexoentity import get_signing_backend, CertificateReference
    77
     
    2626
    2727SYSTEM = platform.system()
     28
     29@pytest.fixture
     30def backend():
     31    return InMemoryBackend(Domain)
    2832
    2933
     
    8185
    8286@pytest.fixture(scope="session")
    83 def backend(test_cert, test_key):
     87def signing_backend(test_cert, test_key):
    8488    """Return the correct backend for the current platform."""
    8589
  • tests/test_signing.py

    r4459fa4 r753855a  
    44from flexoentity import FlexOID, EntityState, EntityType, FlexoSignature, get_signing_backend
    55
    6 def test_sign_and_verify_linux(backend):
     6def test_sign_and_verify_linux(signing_backend):
    77    data = b"Hello Flex-O signing!"
    88
    9     signature = backend.sign(data)
     9    signature = signing_backend.sign(data)
    1010    assert isinstance(signature, bytes)
    1111    assert len(signature) > 20      # sanity check
    1212
    13     assert backend.verify(data, signature) is True
     13    assert signing_backend.verify(data, signature) is True
    1414
    15 def test_sign_and_verify_macos(backend):
     15def test_sign_and_verify_macos(signing_backend):
    1616    data = b"Hello Flex-O signing!"
    1717
    18     signature = backend.sign(data)
     18    signature = signing_backend.sign(data)
    1919    assert isinstance(signature, bytes)
    2020    assert len(signature) > 20      # sanity check
    2121
    22     assert backend.verify(data, signature) is True
     22    assert signing_backend.verify(data, signature) is True
    2323
    24 def test_verify_fails_with_wrong_data(backend):
     24def test_verify_fails_with_wrong_data(signing_backend):
    2525    data = b"Original Data"
    2626    wrong_data = b"Tampered Data"
    2727
    28     signature = backend.sign(data)
     28    signature = signing_backend.sign(data)
    2929
    30     assert backend.verify(data, signature) == True
    31     assert backend.verify(wrong_data, signature) == False
     30    assert signing_backend.verify(data, signature) == True
     31    assert signing_backend.verify(wrong_data, signature) == False
    3232
    33 def test_verify_fails_with_invalid_signature(backend):
     33def test_verify_fails_with_invalid_signature(signing_backend):
    3434    data = b"Hello world"
    3535    invalid_sig = b"\x00\x01\x02garbagepkcs7data"
    3636
    37     assert backend.verify(data, invalid_sig) is False
     37    assert signing_backend.verify(data, invalid_sig) is False
    3838
    39 def test_signature_entity_create_and_verify(backend):
     39def test_signature_entity_create_and_verify(signing_backend):
    4040    entity_id = FlexOID.safe_generate(
    4141        domain_id="TEST",
     
    5252        entity=entity_id,
    5353        signer_id=signer,
    54         backend=backend,
     54        backend=signing_backend,
    5555    )
    5656
     
    6161    assert sig.certificate_thumbprint != ""
    6262
    63     assert sig.verify(data, backend) is True
     63    assert sig.verify(data, signing_backend) is True
Note: See TracChangeset for help on using the changeset viewer.