Changeset 753855a in flexoentity
- Timestamp:
- 12/03/25 22:14:53 (6 weeks ago)
- Branches:
- master
- Children:
- ea28ca0
- Parents:
- 4459fa4
- Files:
-
- 4 added
- 4 edited
-
flexoentity/json_file_backend.py (modified) (1 diff)
-
flexoentity/sqlite_entity_backend.py (modified) (2 diffs)
-
tests/conftest.py (modified) (3 diffs)
-
tests/test_composite_backend.py (added)
-
tests/test_in_memory_backend.py (added)
-
tests/test_json_file_backend.py (added)
-
tests/test_signing.py (modified) (3 diffs)
-
tests/test_sqlite_backend.py (added)
Legend:
- Unmodified
- Added
- Removed
-
flexoentity/json_file_backend.py
r4459fa4 r753855a 1 1 import json 2 2 from .persistance_backend import PersistanceBackend 3 from .in_memory_backend import InMemoryBackend 3 4 4 5 5 6 class JsonFileBackend(PersistanceBackend): 6 def __init__(self, path: str): 7 self.path = path 8 self.data = [] 9 self._load_file() 7 """ 8 JSON-file based backend. 10 9 11 def _load_file(self): 12 try: 13 with open(self.path, "r", encoding="utf-8") as f: 14 self.data = json.load(f) 15 except FileNotFoundError: 16 self.data = [] 10 Uses an internal InMemoryBackend and syncs to a single JSON file on disk. 11 """ 17 12 18 def save(self, entity_dict):19 self. data.append(entity_dict)20 self._ flush()13 def __init__(self, entity_class, path): 14 self._mem = InMemoryBackend(entity_class) 15 self._path = path 21 16 22 def update(self, entity_dict): 23 fid = entity_dict["flexo_id"] 24 self.data = [d for d in self.data if d["flexo_id"] != fid] 25 self.data.append(entity_dict) 26 self._flush() 17 @property 18 def entity_class(self): 19 return self._mem.entity_class 27 20 28 def delete(self, flexo_id): 29 self.data = [d for d in self.data if d["flexo_id"] != flexo_id] 30 self._flush() 21 # core API just delegates to memory backend 31 22 32 def load(self, flexo_id): 33 for d in self.data: 34 if d["flexo_id"] == flexo_id: 35 return d 36 return None 23 def save(self, entity): 24 self._mem.save(entity) 25 26 def update(self, entity): 27 self._mem.update(entity) 28 29 def delete(self, flexo_id: str): 30 self._mem.delete(flexo_id) 31 32 def load(self, flexo_id: str): 33 return self._mem.load(flexo_id) 37 34 38 35 def load_all(self): 39 return self. data36 return self._mem.load_all() 40 37 41 def _flush(self): 42 with open(self.path, "w", encoding="utf-8") as f: 43 json.dump(self.data, f, ensure_ascii=False, indent=2) 38 def clear(self): 39 self._mem.clear() 40 41 # file sync 42 43 def flush_to_file(self): 44 data = [e.to_dict() for e in self._mem.load_all()] 45 with open(self._path, "w", encoding="utf-8") as f: 46 json.dump(data, f, ensure_ascii=False, indent=2) 47 48 def load_from_file(self): 49 try: 50 with open(self._path, "r", encoding="utf-8") as f: 51 data = json.load(f) 52 except FileNotFoundError: 53 return 54 55 self._mem.clear() 56 for d in data: 57 self._mem.save(self.entity_class.from_dict(d)) -
flexoentity/sqlite_entity_backend.py
r4459fa4 r753855a 1 import sqlite32 1 import json 3 from typing import Optional, Iterable4 5 2 from .persistance_backend import PersistanceBackend 6 3 … … 8 5 class SQLiteEntityBackend(PersistanceBackend): 9 6 """ 10 Generic SQLite persistence layer for FlexOEntity subclasses. 11 12 Subclasses must define: 13 - TABLE_NAME (e.g. "users") 14 - ENTITY_CLASS (e.g. FlexoUser) 15 - INDEXED_FIELDS (list of field names to index) 7 SQLite backend storing **dicts**, not entities. 8 Managers do the entity conversions. 16 9 """ 17 10 18 TABLE_NAME = None 19 ENTITY_CLASS = None 20 INDEXED_FIELDS: list[str] = [] 21 22 def __init__(self, db_path: str): 23 if self.TABLE_NAME is None: 24 raise ValueError("Subclasses must define TABLE_NAME") 25 if self.ENTITY_CLASS is None: 26 raise ValueError("Subclasses must define ENTITY_CLASS") 27 28 self.db_path = db_path 29 self.conn = sqlite3.connect(db_path) 30 self.conn.row_factory = sqlite3.Row 11 def __init__(self, entity_class, conn, table_name): 12 self.entity_class = entity_class 13 self.conn = conn 14 self.table = table_name 31 15 self._init_schema() 32 16 33 # -----------------------------------------------------------34 # Schema35 # -----------------------------------------------------------36 17 def _init_schema(self): 37 # Base schema: store full entity JSON and the FlexOID 38 columns = [ 39 "flexo_id TEXT PRIMARY KEY", 40 "json TEXT NOT NULL", 41 ] 42 43 # Add indexed metadata columns 44 for field in self.INDEXED_FIELDS: 45 columns.append(f"{field} TEXT") 46 47 colspec = ", ".join(columns) 48 49 self.conn.execute( 50 f"CREATE TABLE IF NOT EXISTS {self.TABLE_NAME} ({colspec})" 51 ) 52 53 # Create indices on metadata fields 54 for field in self.INDEXED_FIELDS: 55 self.conn.execute( 56 f"CREATE INDEX IF NOT EXISTS idx_{self.TABLE_NAME}_{field} " 57 f"ON {self.TABLE_NAME}({field})" 18 self.conn.execute(f""" 19 CREATE TABLE IF NOT EXISTS {self.table} ( 20 flexo_id TEXT PRIMARY KEY, 21 json TEXT NOT NULL 58 22 ) 59 23 """) 60 24 self.conn.commit() 61 25 62 # ----------------------------------------------------------- 63 # CRUD operations 64 # ----------------------------------------------------------- 65 def add(self, obj): 66 data = json.dumps(obj.to_dict(), ensure_ascii=False) 67 68 # Collect row data dynamically 69 fields = ["flexo_id", "json"] + self.INDEXED_FIELDS 70 placeholders = ", ".join("?" for _ in fields) 71 sql = ( 72 f"INSERT INTO {self.TABLE_NAME} ({', '.join(fields)}) " 73 f"VALUES ({placeholders})" 74 ) 75 76 params = [obj.flexo_id.value, data] 77 params.extend(getattr(obj, f) for f in self.INDEXED_FIELDS) 78 79 self.conn.execute(sql, params) 80 self.conn.commit() 81 82 def update(self, obj): 83 data = json.dumps(obj.to_dict(), ensure_ascii=False) 84 set_parts = ["json = ?"] + [f"{f} = ?" for f in self.INDEXED_FIELDS] 85 86 sql = ( 87 f"UPDATE {self.TABLE_NAME} SET " 88 f"{', '.join(set_parts)} " 89 f"WHERE flexo_id = ?" 90 ) 91 92 params = [data] 93 params.extend(getattr(obj, f) for f in self.INDEXED_FIELDS) 94 params.append(obj.flexo_id.value) 95 96 self.conn.execute(sql, params) 97 self.conn.commit() 98 99 def delete(self, flexo_id: str): 26 def save(self, entity): 27 entity_dict = entity.to_dict() 28 fid = entity_dict["meta"]["flexo_id"] 100 29 self.conn.execute( 101 f" DELETE FROM {self.TABLE_NAME} WHERE flexo_id = ?",102 (f lexo_id,),30 f"INSERT OR REPLACE INTO {self.table} (flexo_id, json) VALUES (?, ?)", 31 (fid, json.dumps(entity_dict)) 103 32 ) 104 33 self.conn.commit() 105 34 106 # ----------------------------------------------------------- 107 # Retrieval 108 # ----------------------------------------------------------- 109 def get(self, flexo_id: str): 35 def update(self, entity): 36 entity_dict = entity.to_dict() 37 fid = entity_dict["meta"]["flexo_id"] 38 self.conn.execute( 39 f"UPDATE {self.table} SET json = ? WHERE flexo_id = ?", 40 (json.dumps(entity_dict), fid) 41 ) 42 self.conn.commit() 43 44 def delete(self, flexo_id): 45 self.conn.execute( 46 f"DELETE FROM {self.table} WHERE flexo_id = ?", 47 (flexo_id,) 48 ) 49 self.conn.commit() 50 51 def load(self, flexo_id): 110 52 row = self.conn.execute( 111 f"SELECT json FROM {self. TABLE_NAME} WHERE flexo_id = ?",53 f"SELECT json FROM {self.table} WHERE flexo_id = ?", 112 54 (flexo_id,), 113 55 ).fetchone() 56 114 57 if not row: 115 58 return None 116 return self.ENTITY_CLASS.from_dict(json.loads(row["json"]))117 59 118 def all(self): 60 return self.entity_class.from_dict(json.loads(row["json"])) 61 62 def load_all(self): 119 63 rows = self.conn.execute( 120 f"SELECT json FROM {self. TABLE_NAME}"64 f"SELECT json FROM {self.table}" 121 65 ).fetchall() 122 return [123 self.ENTITY_CLASS.from_dict(json.loads(r["json"])) for r in rows124 ]125 66 126 # ----------------------------------------------------------- 127 # Dynamic finders: find_by_<field> 128 # ----------------------------------------------------------- 129 def __getattr__(self, name): 130 # e.g. find_by_username("foo") 131 if name.startswith("find_by_"): 132 field = name[len("find_by_"):] 133 if field not in self.INDEXED_FIELDS: 134 raise AttributeError(f"No such indexed field: {field}") 67 return [self.entity_class.from_dict(json.loads(r["json"])) for r in rows] 135 68 136 def finder(value): 137 rows = self.conn.execute( 138 f"SELECT json FROM {self.TABLE_NAME} WHERE {field} = ?", 139 (value,), 140 ) 141 for row in rows: 142 yield self.ENTITY_CLASS.from_dict(json.loads(row["json"])) 143 144 return finder 145 146 raise AttributeError(name) 69 def clear(self): 70 self.conn.execute(f"DELETE FROM {self.table}") 71 self.conn.commit() -
tests/conftest.py
r4459fa4 r753855a 3 3 from pathlib import Path 4 4 from datetime import datetime 5 from flexoentity import Domain, FlexoSignature, DomainManager, EntityRegistry, CompositeBackend 5 from flexoentity import Domain, FlexoSignature, DomainManager, EntityRegistry, CompositeBackend, InMemoryBackend 6 6 from flexoentity import get_signing_backend, CertificateReference 7 7 … … 26 26 27 27 SYSTEM = platform.system() 28 29 @pytest.fixture 30 def backend(): 31 return InMemoryBackend(Domain) 28 32 29 33 … … 81 85 82 86 @pytest.fixture(scope="session") 83 def backend(test_cert, test_key):87 def signing_backend(test_cert, test_key): 84 88 """Return the correct backend for the current platform.""" 85 89 -
tests/test_signing.py
r4459fa4 r753855a 4 4 from flexoentity import FlexOID, EntityState, EntityType, FlexoSignature, get_signing_backend 5 5 6 def test_sign_and_verify_linux( backend):6 def test_sign_and_verify_linux(signing_backend): 7 7 data = b"Hello Flex-O signing!" 8 8 9 signature = backend.sign(data)9 signature = signing_backend.sign(data) 10 10 assert isinstance(signature, bytes) 11 11 assert len(signature) > 20 # sanity check 12 12 13 assert backend.verify(data, signature) is True13 assert signing_backend.verify(data, signature) is True 14 14 15 def test_sign_and_verify_macos( backend):15 def test_sign_and_verify_macos(signing_backend): 16 16 data = b"Hello Flex-O signing!" 17 17 18 signature = backend.sign(data)18 signature = signing_backend.sign(data) 19 19 assert isinstance(signature, bytes) 20 20 assert len(signature) > 20 # sanity check 21 21 22 assert backend.verify(data, signature) is True22 assert signing_backend.verify(data, signature) is True 23 23 24 def test_verify_fails_with_wrong_data( backend):24 def test_verify_fails_with_wrong_data(signing_backend): 25 25 data = b"Original Data" 26 26 wrong_data = b"Tampered Data" 27 27 28 signature = backend.sign(data)28 signature = signing_backend.sign(data) 29 29 30 assert backend.verify(data, signature) == True31 assert backend.verify(wrong_data, signature) == False30 assert signing_backend.verify(data, signature) == True 31 assert signing_backend.verify(wrong_data, signature) == False 32 32 33 def test_verify_fails_with_invalid_signature( backend):33 def test_verify_fails_with_invalid_signature(signing_backend): 34 34 data = b"Hello world" 35 35 invalid_sig = b"\x00\x01\x02garbagepkcs7data" 36 36 37 assert backend.verify(data, invalid_sig) is False37 assert signing_backend.verify(data, invalid_sig) is False 38 38 39 def test_signature_entity_create_and_verify( backend):39 def test_signature_entity_create_and_verify(signing_backend): 40 40 entity_id = FlexOID.safe_generate( 41 41 domain_id="TEST", … … 52 52 entity=entity_id, 53 53 signer_id=signer, 54 backend= backend,54 backend=signing_backend, 55 55 ) 56 56 … … 61 61 assert sig.certificate_thumbprint != "" 62 62 63 assert sig.verify(data, backend) is True63 assert sig.verify(data, signing_backend) is True
Note:
See TracChangeset
for help on using the changeset viewer.
