Changeset 3d598d5 in flexoentity
- Timestamp:
- 12/08/25 15:35:46 (5 weeks ago)
- Branches:
- master
- Children:
- ef5aac4
- Parents:
- e230c40
- Location:
- flexoentity
- Files:
-
- 2 edited
-
domain_manager.py (modified) (1 diff)
-
entity_manager.py (modified) (3 diffs)
Legend:
- Unmodified
- Added
- Removed
-
flexoentity/domain_manager.py
re230c40 r3d598d5 1 1 from __future__ import annotations 2 2 import json 3 4 from typing import Iterable5 3 6 4 from .flexo_collection import FlexoCollection -
flexoentity/entity_manager.py
re230c40 r3d598d5 1 1 from .flexo_entity import FlexoEntity 2 2 from .typed_collection import TypedCollection 3 4 5 class EntityNotFoundError(KeyError): 6 pass 7 8 9 class InvalidEntityError(ValueError): 10 pass 11 12 13 class DuplicateEntityError(ValueError): 14 pass 3 15 4 16 … … 26 38 self.staging_backend = staging_backend 27 39 self.permanent_backend = permanent_backend 40 41 def backend_of_domain_id(self, domain_id: str): 42 for backend_name, backend in [ 43 ("local", self.local_backend), 44 ("staging", self.staging_backend), 45 ("permanent", self.permanent_backend), 46 ]: 47 for dom in backend.load_all(): 48 if dom.domain_id == domain_id: 49 return backend_name 50 return None 51 52 def backend_of_flexo_id(self, flexo_id: str): 53 for backend_name, backend in [ 54 ("local", self.local_backend), 55 ("staging", self.staging_backend), 56 ("permanent", self.permanent_backend), 57 ]: 58 if backend.load(flexo_id) is not None: 59 return backend_name 60 return None 28 61 29 62 # ------------------------------------------------------------------ … … 58 91 def all(self): 59 92 """Load all entities as a list of instances.""" 60 return self.local_backend.load_all() 93 return (self.local_backend.load_all() + self.staging_backend.load_all() + 94 self.permanent_backend.load_all()) 61 95 62 96 def promote_to_staging(self, flexo_id): 63 97 entity = self.local_backend.load(flexo_id) 98 if entity is None: 99 raise EntityNotFoundError 64 100 self.staging_backend.save(entity) 101 self.local_backend.delete(flexo_id) 65 102 66 103 def promote_to_permanent(self, flexo_id): 67 104 entity = self.staging_backend.load(flexo_id) 105 if entity is None: 106 raise EntityNotFoundError 68 107 self.permanent_backend.save(entity) 108 self.staging_backend.delete(flexo_id) 69 109 70 110 def sync_all(self):
Note:
See TracChangeset
for help on using the changeset viewer.
