import json from copy import deepcopy from flowtimer.Phase import Phase class RecurringPhaseSequence: @classmethod def from_json(cls, a_json_string): def custom_object_hook(d): if 'title' in d and 'initial_ticks' in d: return Phase(d['title'], d['initial_ticks']) if 'phases' in d and 'initial_repetitions' in d: return RecurringPhaseSequence(d["title"], d['phases'], d['initial_repetitions']) print("Wrong format") return d return json.loads(a_json_string, object_hook=custom_object_hook) @classmethod def default_json_string(cls): return json.dumps({"title": "default", "phases": [{"title": "Tasking", "initial_ticks": 5}, {"title": "Work", "initial_ticks": 45}, {"title": "Break", "initial_ticks": 15}], "initial_repetitions": 3}) @classmethod def default(cls): return cls.from_json(cls.default_json_string()) def __init__(self, title, phases, repetitions): assert repetitions > 0 assert phases is not [] self._title = title self._state = "initial" self.phases = phases self.current_phase = phases[0] self.initial_repetitions = repetitions self.passes_left = repetitions - 1 def to_json(self): return json.dumps(self.__dict__, default=lambda each: each.to_json()) @property def title(self): return self._title def __str__(self): return ("Sequence title:" + self.title + "\n" + str(self.current_phase)) def is_sequence(self): return True def current_phase_number(self): return self.phases.index(self.current_phase) def phases_left_in_pass(self): return len(self.upcoming_phases_in_pass()) def upcoming_phases_in_pass(self): if self.current_phase_number() < len(self.phases) - 1: return self.phases[self.current_phase_number()+1:] return [] @property def initial_ticks(self): return sum([each.initial_ticks for each in self.phases]) @property def ticks_left(self): return ( (self.passes_left-1) * sum([each.initial_ticks for each in self.phases]) + self.current_phase.ticks_left + sum([each.ticks_left for each in self.upcoming_phases_in_pass()])) def state(self): if self.is_completed(): return "completed" return self._state def is_final_round(self): print("Final round", self.passes_left) return self.passes_left == 0 def is_completed(self): return ((self.passes_left < 1) and (not self.upcoming_phases_in_pass() and self.current_phase.is_completed())) def is_terminated(self): return (self.is_aborted() or self.is_completed()) def is_aborted(self): return self._state == 'aborted' def abort(self): self.current_phase.abort() self._state = "aborted" def start(self): self._state = "running" def skip(self): if self.upcoming_phases_in_pass(): self.current_phase.reset() self.current_phase = self.upcoming_phases_in_pass()[0] return else: if self.is_final_round(): print("Abort on final round") self.abort() return else: self.passes_left -= 1 print("Skip", self.passes_left) self.current_phase.reset() self.current_phase = self.phases[0] def advance_to_next_phase(self): current_index = self.current_phase_number() if current_index < len(self.phases) - 1: # Move to the next phase in the sequence self.current_phase.reset() self.current_phase = self.phases[current_index + 1] else: # Completed a full sequence; check if more repetitions are needed self.passes_left -= 1 if self.passes_left < 1: self._state = "completed" print("Passes left", self.passes_left) else: self.current_phase.reset() self.current_phase = self.phases[0] # Reset to the first phase def tick(self, ticks): if not self.is_completed(): result = self.current_phase.tick(ticks) if self.current_phase.is_completed(): self.advance_to_next_phase() result = self.tick(abs(result)) return def unrolled(self): return [deepcopy(seq) for seq in [each for each in self.initial_repetitions * self.phases]]