Skip to content

core / trigger

core.trigger

core.trigger

Trigger in this framework is the entry point of ALL reactions by the agent.

TriggerQueue

Concurrency-safe priority queue for Trigger.

Source code in core\trigger.py
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
class TriggerQueue:
    """
    Concurrency-safe priority queue for Trigger.
    """

    def __init__(self, llm: LLMInterface) -> None:
        """
        Initialize a concurrency-safe trigger queue.

        The queue manages incoming :class:`Trigger` objects using a heap to
        preserve ordering by ``fire_at`` timestamp and priority. A shared
        :class:`asyncio.Condition` coordinates producers and consumers so agent
        loops can await triggers without busy waiting.

        Args:
            llm: Interface used to resolve conflicts between competing triggers
                for the same session.
        """
        self._heap: List[Trigger] = []
        self._cv = asyncio.Condition()
        self.llm = llm
    # =================================================================
    # Pretty Printer for Debugging
    # =================================================================
    def _print_queue(self, label: str) -> None:
        logger.debug("=" * 70)
        logger.debug(f"[TRIGGER QUEUE] {label}")
        logger.debug("=" * 70)

        if not self._heap:
            logger.debug("(empty)")
            return

        now = time.time()
        for i, t in enumerate(sorted(self._heap, key=lambda x: (x.fire_at, x.priority))):
            logger.debug(
                f"{i+1}. session_id={t.session_id} | "
                f"prio={t.priority} | "
                f"fire_at={t.fire_at:.6f} ({time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(t.fire_at))}) | "
                f"delta={t.fire_at - now:.2f}s\n"
                f"   desc={t.next_action_description}"
            )
        logger.debug("=" * 70 + "\n")

    def create_conversation_history_state(self):
        """Return formatted conversation history for trigger comparison."""
        conversation_state = STATE.conversation_state
        if conversation_state:
            return (
                "This is the conversation history (from oldest to newest messages):"
                f"\n{conversation_state}"
            )
        return "There is no stored conversation history for the current session yet."

    def create_event_stream_state(self):
        """Return formatted event stream content for trigger comparison."""
        event_stream = STATE.event_stream
        if event_stream:
            return (
                "Use the event stream to understand the current situation, past agent actions to craft the input parameters:\nEvent stream (oldest to newest):"
                f"\n{event_stream}"
            )
        return ""

    def create_task_state(self):
        """Return formatted task/plan context for trigger comparison."""
        current_task: Optional[Task] = STATE.current_task
        if current_task:
            current_task_dict: Dict[str, Any] = current_task.to_dict(fold=False, current_step_index=STATE.agent_properties.get_property("current_step_index"))
            return "The plan of the current on-going task:" + f"\n{json.dumps(current_task_dict, indent=4)}"
        return ""

    def create_system_agent_state(self):
        """Compose session context for trigger handling from component states."""

        sections = [
            self.create_conversation_history_state(),
            self.create_event_stream_state(),
            self.create_task_state(),
        ]

        return "\n\n".join([section for section in sections if section])

    async def clear(self) -> None:
        """
        Remove all pending triggers from the queue.

        The queue is cleared under the protection of the condition variable so
        waiting consumers are notified immediately that the queue state has
        changed.
        """
        async with self._cv:
            self._heap.clear()
            self._cv.notify_all()

    # =================================================================
    # PUT
    # =================================================================
    async def put(self, trig: Trigger) -> None:
        """
        Insert a trigger into the queue, merging with existing session triggers.

        When a trigger arrives for a session that already has queued work, the
        method consults the LLM to generate a new session identifier that
        represents the preferred trigger. Existing triggers for that session
        are removed so the freshest trigger wins.

        Args:
            trig: Trigger instance describing when and why the agent should act.
        """
        logger.debug(f"\n[PUT] Incoming trigger for session={trig.session_id}")
        self._print_queue("BEFORE PUT")

        existing_triggers: List[Trigger] = self._heap

        if len(existing_triggers) > 0:

            sys_msg = f"Existing Context: {self.create_system_agent_state()}"
            # If heap empty → push directly
            if not existing_triggers:
                existing_triggers.append(trig)
            else:
                logger.debug("[TRIGGER QUEUE] Heap not empty → ignoring new trigger for LLM comparison")

            usr_msg = CHECK_TRIGGERS_STATE_PROMPT.format(
                context=trig,
                existing_triggers=existing_triggers,
            )

            new_trigger_id = await self.llm.generate_response_async(sys_msg, usr_msg)
            logger.debug(f"[PUT] New trigger value: {new_trigger_id}")

            # Update the incoming trigger's ID
            trig.session_id = new_trigger_id

        async with self._cv:
            # find all triggers in heap with same session_id
            same = [t for t in self._heap if t.session_id == trig.session_id]

            if same:
                logger.debug("[PUT] Existing trigger(s) found → PREFER NEW TRIGGER")
                self._print_queue("BEFORE REPLACE (PUT)")

                # Remove ALL old triggers for this session
                self._heap = [t for t in self._heap if t.session_id != trig.session_id]

                # NEW BEHAVIOUR: prefer new → push new trigger only
                heapq.heappush(self._heap, trig)

                logger.debug("[PUT] REPLACED old triggers with NEW trigger")
                self._print_queue("AFTER REPLACE (PUT)")

            else:
                logger.debug("[PUT] No existing session trigger → pushing normally")
                heapq.heappush(self._heap, trig)

            heapq.heapify(self._heap)

            self._print_queue("AFTER PUT")
            self._cv.notify()

    # =================================================================
    # GET
    # =================================================================
    async def get(self) -> Trigger:
        """
        Retrieve the next trigger to execute, waiting until one is ready.

        The method drains all triggers that are ready to fire, merges triggers
        belonging to the same session, and returns the highest-priority
        combined trigger. If no trigger is ready, it waits until either the
        earliest trigger's ``fire_at`` time arrives or a producer notifies the
        condition.

        Returns:
            The next merged :class:`Trigger` ready for execution.
        """
        logger.debug("\n[GET] CALLED")
        self._print_queue("QUEUE BEFORE GET")

        async with self._cv:
            while True:
                now = time.time()

                # collect ready triggers
                ready: List[Trigger] = []
                while self._heap and self._heap[0].fire_at <= now:
                    ready.append(heapq.heappop(self._heap))

                if ready:
                    logger.debug(f"[GET] {len(ready)} trigger(s) are ready")
                    self._print_queue("READY BEFORE MERGE (GET)")

                    merged_ready = self._merge_ready_triggers(ready)
                    merged_ready.sort(key=lambda t: (t.priority, t.fire_at))

                    trig = merged_ready.pop(0)
                    logger.info(
                        f"[TRIGGER FIRED] session={trig.session_id} | desc={trig.next_action_description}"
                    )

                    # requeue leftover
                    for t in merged_ready:
                        heapq.heappush(self._heap, t)

                    self._print_queue("QUEUE AFTER GET (POST-MERGE)")
                    return trig

                # wait for next trigger
                if self._heap:
                    next_fire = self._heap[0].fire_at
                    delay = next_fire - now
                    if delay <= 0:
                        continue
                    try:
                        await asyncio.wait_for(self._cv.wait(), timeout=delay)
                    except asyncio.TimeoutError:
                        continue
                else:
                    await self._cv.wait()

    # =================================================================
    # SIZE / LIST
    # =================================================================
    async def size(self) -> int:
        """
        Count how many triggers are currently queued.

        Returns:
            The number of triggers stored in the heap.
        """
        async with self._cv:
            return len(self._heap)

    async def list_triggers(self) -> List[Trigger]:
        """
        List the triggers currently in the queue without altering order.

        Returns:
            A shallow copy of the internal trigger heap contents.
        """
        async with self._cv:
            return list(self._heap)

    # =================================================================
    # FIRE NOW
    # =================================================================
    async def fire(self, session_id: str) -> bool:
        """
        Mark a trigger for a given session as ready to fire immediately.

        The ``fire_at`` timestamp for matching triggers is updated to the
        current time, and waiting consumers are notified.

        Args:
            session_id: Identifier of the session whose trigger should fire
                now.

        Returns:
            ``True`` if at least one trigger was updated, otherwise ``False``.
        """
        async with self._cv:
            found = False
            for t in self._heap:
                if t.session_id == session_id:
                    t.fire_at = time.time()
                    found = True
            if found:
                self._cv.notify()
            return found

    # =================================================================
    # REMOVE SESSIONS
    # =================================================================
    async def remove_sessions(self, session_ids: list[str]) -> None:
        """
        Remove all triggers that belong to the provided session identifiers.

        Args:
            session_ids: Sessions whose queued triggers should be discarded.
                An empty list leaves the queue unchanged.
        """
        if not session_ids:
            return
        async with self._cv:
            self._heap = [t for t in self._heap if t.session_id not in session_ids]
            heapq.heapify(self._heap)
            self._cv.notify_all()

    # =================================================================
    # MERGE HELPERS
    # =================================================================
    def _merge_ready_triggers(self, ready: List[Trigger]) -> List[Trigger]:
        grouped = defaultdict(list)
        for trig in ready:
            grouped[trig.session_id].append(trig)

        result = []
        for session_id, triggers in grouped.items():
            logger.debug(f"[MERGE READY] Merging {len(triggers)} triggers for session={session_id}")
            result.append(self._merge_trigger_group(session_id, triggers))

        return result

    def _merge_trigger_group(self, session_id: Optional[str], triggers: List[Trigger]) -> Trigger:
        logger.debug(f"[MERGE GROUP] session={session_id}, count={len(triggers)}")
        triggers.sort(key=lambda t: (t.priority, t.fire_at))

        combined_payload = {}
        combined_desc = OrderedDict()
        priority = triggers[0].priority
        fire_at = triggers[0].fire_at

        for trig in triggers:
            priority = min(priority, trig.priority)
            fire_at = min(fire_at, trig.fire_at)

            desc = (trig.next_action_description or "").strip()
            if desc and desc not in combined_desc:
                combined_desc[desc] = None

            combined_payload.update(trig.payload)

        merged_desc = "\n\n".join(combined_desc.keys()) or triggers[0].next_action_description

        merged = Trigger(
            fire_at=fire_at,
            priority=priority,
            next_action_description=merged_desc,
            payload=combined_payload,
            session_id=session_id,
        )

        logger.debug(f"[MERGE RESULT] session={session_id}, fire_at={fire_at}, priority={priority}")
        return merged

__init__(llm)

Initialize a concurrency-safe trigger queue.

The queue manages incoming :class:Trigger objects using a heap to preserve ordering by fire_at timestamp and priority. A shared :class:asyncio.Condition coordinates producers and consumers so agent loops can await triggers without busy waiting.

Parameters:

Name Type Description Default
llm LLMInterface

Interface used to resolve conflicts between competing triggers for the same session.

required
Source code in core\trigger.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
def __init__(self, llm: LLMInterface) -> None:
    """
    Initialize a concurrency-safe trigger queue.

    The queue manages incoming :class:`Trigger` objects using a heap to
    preserve ordering by ``fire_at`` timestamp and priority. A shared
    :class:`asyncio.Condition` coordinates producers and consumers so agent
    loops can await triggers without busy waiting.

    Args:
        llm: Interface used to resolve conflicts between competing triggers
            for the same session.
    """
    self._heap: List[Trigger] = []
    self._cv = asyncio.Condition()
    self.llm = llm

create_conversation_history_state()

Return formatted conversation history for trigger comparison.

Source code in core\trigger.py
76
77
78
79
80
81
82
83
84
def create_conversation_history_state(self):
    """Return formatted conversation history for trigger comparison."""
    conversation_state = STATE.conversation_state
    if conversation_state:
        return (
            "This is the conversation history (from oldest to newest messages):"
            f"\n{conversation_state}"
        )
    return "There is no stored conversation history for the current session yet."

create_event_stream_state()

Return formatted event stream content for trigger comparison.

Source code in core\trigger.py
86
87
88
89
90
91
92
93
94
def create_event_stream_state(self):
    """Return formatted event stream content for trigger comparison."""
    event_stream = STATE.event_stream
    if event_stream:
        return (
            "Use the event stream to understand the current situation, past agent actions to craft the input parameters:\nEvent stream (oldest to newest):"
            f"\n{event_stream}"
        )
    return ""

create_task_state()

Return formatted task/plan context for trigger comparison.

Source code in core\trigger.py
 96
 97
 98
 99
100
101
102
def create_task_state(self):
    """Return formatted task/plan context for trigger comparison."""
    current_task: Optional[Task] = STATE.current_task
    if current_task:
        current_task_dict: Dict[str, Any] = current_task.to_dict(fold=False, current_step_index=STATE.agent_properties.get_property("current_step_index"))
        return "The plan of the current on-going task:" + f"\n{json.dumps(current_task_dict, indent=4)}"
    return ""

create_system_agent_state()

Compose session context for trigger handling from component states.

Source code in core\trigger.py
104
105
106
107
108
109
110
111
112
113
def create_system_agent_state(self):
    """Compose session context for trigger handling from component states."""

    sections = [
        self.create_conversation_history_state(),
        self.create_event_stream_state(),
        self.create_task_state(),
    ]

    return "\n\n".join([section for section in sections if section])

clear() async

Remove all pending triggers from the queue.

The queue is cleared under the protection of the condition variable so waiting consumers are notified immediately that the queue state has changed.

Source code in core\trigger.py
115
116
117
118
119
120
121
122
123
124
125
async def clear(self) -> None:
    """
    Remove all pending triggers from the queue.

    The queue is cleared under the protection of the condition variable so
    waiting consumers are notified immediately that the queue state has
    changed.
    """
    async with self._cv:
        self._heap.clear()
        self._cv.notify_all()

put(trig) async

Insert a trigger into the queue, merging with existing session triggers.

When a trigger arrives for a session that already has queued work, the method consults the LLM to generate a new session identifier that represents the preferred trigger. Existing triggers for that session are removed so the freshest trigger wins.

Parameters:

Name Type Description Default
trig Trigger

Trigger instance describing when and why the agent should act.

required
Source code in core\trigger.py
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
async def put(self, trig: Trigger) -> None:
    """
    Insert a trigger into the queue, merging with existing session triggers.

    When a trigger arrives for a session that already has queued work, the
    method consults the LLM to generate a new session identifier that
    represents the preferred trigger. Existing triggers for that session
    are removed so the freshest trigger wins.

    Args:
        trig: Trigger instance describing when and why the agent should act.
    """
    logger.debug(f"\n[PUT] Incoming trigger for session={trig.session_id}")
    self._print_queue("BEFORE PUT")

    existing_triggers: List[Trigger] = self._heap

    if len(existing_triggers) > 0:

        sys_msg = f"Existing Context: {self.create_system_agent_state()}"
        # If heap empty → push directly
        if not existing_triggers:
            existing_triggers.append(trig)
        else:
            logger.debug("[TRIGGER QUEUE] Heap not empty → ignoring new trigger for LLM comparison")

        usr_msg = CHECK_TRIGGERS_STATE_PROMPT.format(
            context=trig,
            existing_triggers=existing_triggers,
        )

        new_trigger_id = await self.llm.generate_response_async(sys_msg, usr_msg)
        logger.debug(f"[PUT] New trigger value: {new_trigger_id}")

        # Update the incoming trigger's ID
        trig.session_id = new_trigger_id

    async with self._cv:
        # find all triggers in heap with same session_id
        same = [t for t in self._heap if t.session_id == trig.session_id]

        if same:
            logger.debug("[PUT] Existing trigger(s) found → PREFER NEW TRIGGER")
            self._print_queue("BEFORE REPLACE (PUT)")

            # Remove ALL old triggers for this session
            self._heap = [t for t in self._heap if t.session_id != trig.session_id]

            # NEW BEHAVIOUR: prefer new → push new trigger only
            heapq.heappush(self._heap, trig)

            logger.debug("[PUT] REPLACED old triggers with NEW trigger")
            self._print_queue("AFTER REPLACE (PUT)")

        else:
            logger.debug("[PUT] No existing session trigger → pushing normally")
            heapq.heappush(self._heap, trig)

        heapq.heapify(self._heap)

        self._print_queue("AFTER PUT")
        self._cv.notify()

get() async

Retrieve the next trigger to execute, waiting until one is ready.

The method drains all triggers that are ready to fire, merges triggers belonging to the same session, and returns the highest-priority combined trigger. If no trigger is ready, it waits until either the earliest trigger's fire_at time arrives or a producer notifies the condition.

Returns:

Type Description
Trigger

The next merged :class:Trigger ready for execution.

Source code in core\trigger.py
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
async def get(self) -> Trigger:
    """
    Retrieve the next trigger to execute, waiting until one is ready.

    The method drains all triggers that are ready to fire, merges triggers
    belonging to the same session, and returns the highest-priority
    combined trigger. If no trigger is ready, it waits until either the
    earliest trigger's ``fire_at`` time arrives or a producer notifies the
    condition.

    Returns:
        The next merged :class:`Trigger` ready for execution.
    """
    logger.debug("\n[GET] CALLED")
    self._print_queue("QUEUE BEFORE GET")

    async with self._cv:
        while True:
            now = time.time()

            # collect ready triggers
            ready: List[Trigger] = []
            while self._heap and self._heap[0].fire_at <= now:
                ready.append(heapq.heappop(self._heap))

            if ready:
                logger.debug(f"[GET] {len(ready)} trigger(s) are ready")
                self._print_queue("READY BEFORE MERGE (GET)")

                merged_ready = self._merge_ready_triggers(ready)
                merged_ready.sort(key=lambda t: (t.priority, t.fire_at))

                trig = merged_ready.pop(0)
                logger.info(
                    f"[TRIGGER FIRED] session={trig.session_id} | desc={trig.next_action_description}"
                )

                # requeue leftover
                for t in merged_ready:
                    heapq.heappush(self._heap, t)

                self._print_queue("QUEUE AFTER GET (POST-MERGE)")
                return trig

            # wait for next trigger
            if self._heap:
                next_fire = self._heap[0].fire_at
                delay = next_fire - now
                if delay <= 0:
                    continue
                try:
                    await asyncio.wait_for(self._cv.wait(), timeout=delay)
                except asyncio.TimeoutError:
                    continue
            else:
                await self._cv.wait()

size() async

Count how many triggers are currently queued.

Returns:

Type Description
int

The number of triggers stored in the heap.

Source code in core\trigger.py
256
257
258
259
260
261
262
263
264
async def size(self) -> int:
    """
    Count how many triggers are currently queued.

    Returns:
        The number of triggers stored in the heap.
    """
    async with self._cv:
        return len(self._heap)

list_triggers() async

List the triggers currently in the queue without altering order.

Returns:

Type Description
List[Trigger]

A shallow copy of the internal trigger heap contents.

Source code in core\trigger.py
266
267
268
269
270
271
272
273
274
async def list_triggers(self) -> List[Trigger]:
    """
    List the triggers currently in the queue without altering order.

    Returns:
        A shallow copy of the internal trigger heap contents.
    """
    async with self._cv:
        return list(self._heap)

fire(session_id) async

Mark a trigger for a given session as ready to fire immediately.

The fire_at timestamp for matching triggers is updated to the current time, and waiting consumers are notified.

Parameters:

Name Type Description Default
session_id str

Identifier of the session whose trigger should fire now.

required

Returns:

Type Description
bool

True if at least one trigger was updated, otherwise False.

Source code in core\trigger.py
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
async def fire(self, session_id: str) -> bool:
    """
    Mark a trigger for a given session as ready to fire immediately.

    The ``fire_at`` timestamp for matching triggers is updated to the
    current time, and waiting consumers are notified.

    Args:
        session_id: Identifier of the session whose trigger should fire
            now.

    Returns:
        ``True`` if at least one trigger was updated, otherwise ``False``.
    """
    async with self._cv:
        found = False
        for t in self._heap:
            if t.session_id == session_id:
                t.fire_at = time.time()
                found = True
        if found:
            self._cv.notify()
        return found

remove_sessions(session_ids) async

Remove all triggers that belong to the provided session identifiers.

Parameters:

Name Type Description Default
session_ids list[str]

Sessions whose queued triggers should be discarded. An empty list leaves the queue unchanged.

required
Source code in core\trigger.py
306
307
308
309
310
311
312
313
314
315
316
317
318
319
async def remove_sessions(self, session_ids: list[str]) -> None:
    """
    Remove all triggers that belong to the provided session identifiers.

    Args:
        session_ids: Sessions whose queued triggers should be discarded.
            An empty list leaves the queue unchanged.
    """
    if not session_ids:
        return
    async with self._cv:
        self._heap = [t for t in self._heap if t.session_id not in session_ids]
        heapq.heapify(self._heap)
        self._cv.notify_all()