from flexoentity.in_memory_backend import InMemoryBackend from flexoentity.composite_backend import CompositeBackend from flexoentity.domain import Domain def test_composite_writes_to_all_backends(sample_domain): primary = InMemoryBackend(Domain) secondary = InMemoryBackend(Domain) backend = CompositeBackend(authoritative_backend=primary, sync_backends=[secondary]) backend.save(sample_domain.to_dict()) fid = sample_domain.flexo_id # both must see the entity assert primary.load(fid) is not None assert secondary.load(fid) is not None def test_composite_reads_from_primary_only(sample_domain): primary = InMemoryBackend(Domain) secondary = InMemoryBackend(Domain) backend = CompositeBackend(authoritative_backend=primary, sync_backends=[secondary]) primary.save(sample_domain.to_dict()) # secondary has nothing, but composite should still load from primary fid = sample_domain.flexo_id loaded = Domain.from_dict(backend.load(fid)) assert isinstance(loaded, Domain) def test_composite_clear_propagates(sample_domain): primary = InMemoryBackend(Domain) secondary = InMemoryBackend(Domain) backend = CompositeBackend(authoritative_backend=primary, sync_backends=[secondary]) backend.save(sample_domain.to_dict()) backend.clear() assert primary.load_all() == [] assert secondary.load_all() == []