Skip to content

core / action / action_router

core.action.action_router

core.action.action_router

Agent uses this module to select actions based on the plan

@author: zfoong

ActionRouter

Selects actions based on user queries, with an LLM verifying correctness or creating new actions on the fly.

Source code in core\action\action_router.py
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
class ActionRouter:
    """
    Selects actions based on user queries, with an LLM verifying correctness
    or creating new actions on the fly.
    """

    def __init__(self, action_library: ActionLibrary, llm_interface, context_engine: ContextEngine):
        """
        Initialize the router responsible for selecting or creating actions.

        Args:
            action_library: Repository for storing and retrieving action definitions.
            llm_interface: LLM client used to reason about which action to run.
            context_engine: Provider of system prompts and context formatting.
        """
        self.action_library = action_library
        self.llm_interface = llm_interface
        self.context_engine = context_engine

    async def select_action(
        self,
        query: str,
        action_type: Optional[str] = None,
    ) -> Dict[str, Any]:
        """
        default action selection function when not in a task
        For now, only choosing between chat, ignore or create and start task

        1. Retrieves top-k candidate action names from ChromaDB.
        2. Builds a candidate list with searched and default action for the LLM.
        3. Asks the LLM if any candidate is valid, or if a new action is needed.
        4. If new action is needed, create & store it, then return it.
        5. Otherwise, return the chosen existing action with its parameters.

        Args:
            query: User's request that should be satisfied by an action.
            action_type: Optional type filter forwarded to the LLM.
            context: Additional conversational context to ground the prompt.

        Returns:
            Dict[str, Any]: Parsed decision containing ``action_name`` and
            ``parameters`` ready for execution or creation.
        """
        conversation_mode_actions = ["send message", "ask question", "create and start task", "ignore"]
        action_candidates = []

        for action in conversation_mode_actions:
            act = self.action_library.retrieve_action(action_name=action)
            if act:
                action_candidates.append({
                    "name": act.name,
                    "description": act.description,
                    "type": act.action_type,
                    "input_schema": act.input_schema,
                    "output_schema": act.output_schema
                })

        # Build the instruction prompt for the LLM
        prompt = SELECT_ACTION_PROMPT.format(
            query=query,
            action_candidates=self._format_candidates(action_candidates),
        )

        decision = await self._prompt_for_decision(prompt)

        logger.debug(
            f"Action router selected action={decision.get('action_name')} "
            f"with parameters={decision.get('parameters')}"
        )

        return decision

    async def select_action_in_task(
        self,
        query: str,
        action_type: Optional[str] = None,
        GUI_mode=False,
        reasoning: str = "",
    ) -> Dict[str, Any]:
        """
        When a task is running, this action selection will be used.

        1. Retrieves top-k candidate action names from ChromaDB.
        2. Builds a candidate list with searched and default action for the LLM.
        3. Asks the LLM if any candidate is valid, or if a new action is needed.
        4. If new action is needed, return an empty action name, and let the outer
           loop create the action.
        5. Otherwise, return the chosen existing action along with parameters.

        Args:
            query: Task-level instruction for the next step.
            action_type: Optional action type hint supplied to the LLM.
            GUI_mode: Whether the user is interacting through a GUI, affecting
                which actions are visible.
            context: Serialized task context to embed in the prompt.

        Returns:
            Dict[str, Any]: Decision payload with ``action_name`` and
            normalized ``parameters`` for execution, or an empty ``action_name``
            when a new action should be created.
        """
        action_candidates = []
        action_name_candidates = []

        # List of filtered default actions when creating task
        ignore_actions = ["create and start task", "ignore"]

        # Retrieve default actions (could be multiple)
        default_actions = self.action_library.retrieve_default_action()

        for act in default_actions:
            if act.name in ignore_actions:
                continue
            if not _is_visible_in_mode(act, GUI_mode):
                continue
            action_candidates.append({
                "name": act.name,
                "description": act.description,
                "type": act.action_type,
                "input_schema": act.input_schema,
                "output_schema": act.output_schema
            })

        # Additional candidate actions from search
        candidate_names = self.action_library.search_action(query, top_k=5)
        logger.info(f"ActionRouter found candidate actions: {candidate_names}")
        for name in candidate_names:
            act = self.action_library.retrieve_action(name)
            if not act:
                continue
            if act.name in ignore_actions:
                continue
            if not _is_visible_in_mode(act, GUI_mode):
                continue
            action_candidates.append({
                "name": act.name,
                "description": act.description,
                "type": act.action_type,
                "input_schema": act.input_schema,
                "output_schema": act.output_schema
            })

        # Dedupe names while preserving insertion order
        action_name_candidates = list({candidate["name"]: None for candidate in action_candidates}.keys())

        # Build the instruction prompt for the LLM
        prompt = SELECT_ACTION_IN_TASK_PROMPT.format(
            query=query,
            reasoning=self._format_reasoning(reasoning),
            action_candidates=self._format_candidates(action_candidates),
            action_name_candidates=self._format_action_names(action_name_candidates),
        )

        max_retries = 3
        for attempt in range(max_retries):
            decision = await self._prompt_for_decision(prompt, is_task=True)

            selected_action_name = decision.get("action_name", "")
            if selected_action_name == "":
                return decision

            selected_action = self.action_library.retrieve_action(selected_action_name)
            if selected_action is not None and _is_visible_in_mode(selected_action, GUI_mode):
                decision["parameters"] = self._ensure_parameters(decision.get("parameters"))
                return decision

            logger.warning(
                f"Received invalid action name '{selected_action_name}' during selection attempt {attempt + 1}"
            )

        # 3. If we fail to find a valid action name after the retries, raise an error
        raise ValueError("Invalid selected action returned by LLM after retries.")

    # ------------------------------------------------------------------
    # Internal helpers
    # ------------------------------------------------------------------

    async def _prompt_for_decision(self, prompt: str, is_task: bool = False) -> Dict[str, Any]:
        max_retries = 3
        last_error: Optional[Exception] = None
        current_prompt = prompt
        for attempt in range(max_retries):
            system_prompt, _ = self.context_engine.make_prompt(
                user_flags={"query": False, "expected_output": False},
                system_flags={"agent_info": not is_task, "conversation_history": True, "event_stream": True, "task_state": True, "policy": False},
            )
            raw_response = await self.llm_interface.generate_response_async(system_prompt, current_prompt)
            decision, parse_error = self._parse_action_decision(raw_response)
            if decision is not None:
                decision.setdefault("parameters", {})
                decision["parameters"] = self._ensure_parameters(decision.get("parameters"))
                return decision

            feedback_error = parse_error or "unknown parsing error"
            last_error = ValueError(f"Unable to parse action decision on attempt {attempt + 1}: {feedback_error}")
            logger.warning(
                f"Failed to parse LLM decision on attempt {attempt + 1}: "
                f"{raw_response} | error={feedback_error}"
            )
            current_prompt = self._augment_prompt_with_feedback(prompt, attempt + 1, raw_response, feedback_error)

        if last_error:
            raise last_error
        raise ValueError("Unable to parse LLM decision")

    def _parse_action_decision(self, raw: str) -> Tuple[Optional[Dict[str, Any]], Optional[str]]:
        try:
            parsed = json.loads(raw)
        except json.JSONDecodeError as json_error:
            try:
                parsed = ast.literal_eval(raw)
            except Exception as eval_error:
                logger.error(f"Unable to parse action decision: {raw}")
                return None, f"json error: {json_error}; literal_eval error: {eval_error}"

        if not isinstance(parsed, dict):
            logger.error(f"Parsed action decision is not a dict: {raw}")
            return None, "parsed value is not a dictionary"

        return parsed, None

    def _augment_prompt_with_feedback(
        self,
        base_prompt: str,
        attempt: int,
        raw_response: str,
        error_message: str,
    ) -> str:
        feedback_block = (
            f"\n\nPrevious attempt {attempt} failed to parse because: {error_message}. "
            "Review your last reply above (shown in the RAW RESPONSE section) and return a corrected response. "
            "You must return ONLY a JSON object with action_name and parameters fields. "
            "Do not include any additional commentary, code fences, or explanatory text.\n\n"
            "RAW RESPONSE:\n"
            f"{raw_response}\n"
            "--- End of RAW RESPONSE ---\n"
            "Respond now with the corrected JSON object."
        )
        return base_prompt + feedback_block

    def _format_candidates(self, candidates: List[Dict[str, Any]]) -> str:
        if not candidates:
            return "[]"

        simplified: List[Dict[str, Any]] = []
        for candidate in candidates:
            # input_schema = candidate.get("input_schema") or {}
            # if isinstance(input_schema, dict):
            #     input_fields = list(input_schema.keys())
            # elif isinstance(input_schema, list):
            #     input_fields = list(input_schema)
            # else:
            #     input_fields = []

            output_schema = candidate.get("output_schema") or {}
            if isinstance(output_schema, dict):
                output_fields = list(output_schema.keys())
            elif isinstance(output_schema, list):
                output_fields = list(output_schema)
            else:
                output_fields = []

            simplified.append(
                {
                    "name": candidate.get("name"),
                    "description": candidate.get("description"),
                    "input_schema": candidate.get("input_schema"),
                    "output_schema": output_fields
                }
            )

        return json.dumps(simplified, indent=2, ensure_ascii=False)

    def _format_action_names(self, names: List[str]) -> str:
        if not names:
            return "[]"
        return json.dumps(names, indent=2, ensure_ascii=False)

    def _format_reasoning(self, context: str | list | dict | None) -> str:
        if context is None:
            return ""
        if isinstance(context, (list, dict)):
            return json.dumps(context, indent=2, ensure_ascii=False)
        return str(context)

    def _format_event_stream(self, event_stream: str | list | dict | None) -> str:
        if not event_stream:
            return "No prior events available."
        if isinstance(event_stream, (list, dict)):
            return json.dumps(event_stream, indent=2, ensure_ascii=False)
        return str(event_stream)

    def _ensure_parameters(self, parameters: Any) -> Dict[str, Any]:
        if isinstance(parameters, dict):
            return parameters
        return {}

__init__(action_library, llm_interface, context_engine)

Initialize the router responsible for selecting or creating actions.

Parameters:

Name Type Description Default
action_library ActionLibrary

Repository for storing and retrieving action definitions.

required
llm_interface

LLM client used to reason about which action to run.

required
context_engine ContextEngine

Provider of system prompts and context formatting.

required
Source code in core\action\action_router.py
47
48
49
50
51
52
53
54
55
56
57
58
def __init__(self, action_library: ActionLibrary, llm_interface, context_engine: ContextEngine):
    """
    Initialize the router responsible for selecting or creating actions.

    Args:
        action_library: Repository for storing and retrieving action definitions.
        llm_interface: LLM client used to reason about which action to run.
        context_engine: Provider of system prompts and context formatting.
    """
    self.action_library = action_library
    self.llm_interface = llm_interface
    self.context_engine = context_engine

select_action(query, action_type=None) async

default action selection function when not in a task For now, only choosing between chat, ignore or create and start task

  1. Retrieves top-k candidate action names from ChromaDB.
  2. Builds a candidate list with searched and default action for the LLM.
  3. Asks the LLM if any candidate is valid, or if a new action is needed.
  4. If new action is needed, create & store it, then return it.
  5. Otherwise, return the chosen existing action with its parameters.

Parameters:

Name Type Description Default
query str

User's request that should be satisfied by an action.

required
action_type Optional[str]

Optional type filter forwarded to the LLM.

None
context

Additional conversational context to ground the prompt.

required

Returns:

Type Description
Dict[str, Any]

Dict[str, Any]: Parsed decision containing action_name and

Dict[str, Any]

parameters ready for execution or creation.

Source code in core\action\action_router.py
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
async def select_action(
    self,
    query: str,
    action_type: Optional[str] = None,
) -> Dict[str, Any]:
    """
    default action selection function when not in a task
    For now, only choosing between chat, ignore or create and start task

    1. Retrieves top-k candidate action names from ChromaDB.
    2. Builds a candidate list with searched and default action for the LLM.
    3. Asks the LLM if any candidate is valid, or if a new action is needed.
    4. If new action is needed, create & store it, then return it.
    5. Otherwise, return the chosen existing action with its parameters.

    Args:
        query: User's request that should be satisfied by an action.
        action_type: Optional type filter forwarded to the LLM.
        context: Additional conversational context to ground the prompt.

    Returns:
        Dict[str, Any]: Parsed decision containing ``action_name`` and
        ``parameters`` ready for execution or creation.
    """
    conversation_mode_actions = ["send message", "ask question", "create and start task", "ignore"]
    action_candidates = []

    for action in conversation_mode_actions:
        act = self.action_library.retrieve_action(action_name=action)
        if act:
            action_candidates.append({
                "name": act.name,
                "description": act.description,
                "type": act.action_type,
                "input_schema": act.input_schema,
                "output_schema": act.output_schema
            })

    # Build the instruction prompt for the LLM
    prompt = SELECT_ACTION_PROMPT.format(
        query=query,
        action_candidates=self._format_candidates(action_candidates),
    )

    decision = await self._prompt_for_decision(prompt)

    logger.debug(
        f"Action router selected action={decision.get('action_name')} "
        f"with parameters={decision.get('parameters')}"
    )

    return decision

select_action_in_task(query, action_type=None, GUI_mode=False, reasoning='') async

When a task is running, this action selection will be used.

  1. Retrieves top-k candidate action names from ChromaDB.
  2. Builds a candidate list with searched and default action for the LLM.
  3. Asks the LLM if any candidate is valid, or if a new action is needed.
  4. If new action is needed, return an empty action name, and let the outer loop create the action.
  5. Otherwise, return the chosen existing action along with parameters.

Parameters:

Name Type Description Default
query str

Task-level instruction for the next step.

required
action_type Optional[str]

Optional action type hint supplied to the LLM.

None
GUI_mode

Whether the user is interacting through a GUI, affecting which actions are visible.

False
context

Serialized task context to embed in the prompt.

required

Returns:

Type Description
Dict[str, Any]

Dict[str, Any]: Decision payload with action_name and

Dict[str, Any]

normalized parameters for execution, or an empty action_name

Dict[str, Any]

when a new action should be created.

Source code in core\action\action_router.py
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
async def select_action_in_task(
    self,
    query: str,
    action_type: Optional[str] = None,
    GUI_mode=False,
    reasoning: str = "",
) -> Dict[str, Any]:
    """
    When a task is running, this action selection will be used.

    1. Retrieves top-k candidate action names from ChromaDB.
    2. Builds a candidate list with searched and default action for the LLM.
    3. Asks the LLM if any candidate is valid, or if a new action is needed.
    4. If new action is needed, return an empty action name, and let the outer
       loop create the action.
    5. Otherwise, return the chosen existing action along with parameters.

    Args:
        query: Task-level instruction for the next step.
        action_type: Optional action type hint supplied to the LLM.
        GUI_mode: Whether the user is interacting through a GUI, affecting
            which actions are visible.
        context: Serialized task context to embed in the prompt.

    Returns:
        Dict[str, Any]: Decision payload with ``action_name`` and
        normalized ``parameters`` for execution, or an empty ``action_name``
        when a new action should be created.
    """
    action_candidates = []
    action_name_candidates = []

    # List of filtered default actions when creating task
    ignore_actions = ["create and start task", "ignore"]

    # Retrieve default actions (could be multiple)
    default_actions = self.action_library.retrieve_default_action()

    for act in default_actions:
        if act.name in ignore_actions:
            continue
        if not _is_visible_in_mode(act, GUI_mode):
            continue
        action_candidates.append({
            "name": act.name,
            "description": act.description,
            "type": act.action_type,
            "input_schema": act.input_schema,
            "output_schema": act.output_schema
        })

    # Additional candidate actions from search
    candidate_names = self.action_library.search_action(query, top_k=5)
    logger.info(f"ActionRouter found candidate actions: {candidate_names}")
    for name in candidate_names:
        act = self.action_library.retrieve_action(name)
        if not act:
            continue
        if act.name in ignore_actions:
            continue
        if not _is_visible_in_mode(act, GUI_mode):
            continue
        action_candidates.append({
            "name": act.name,
            "description": act.description,
            "type": act.action_type,
            "input_schema": act.input_schema,
            "output_schema": act.output_schema
        })

    # Dedupe names while preserving insertion order
    action_name_candidates = list({candidate["name"]: None for candidate in action_candidates}.keys())

    # Build the instruction prompt for the LLM
    prompt = SELECT_ACTION_IN_TASK_PROMPT.format(
        query=query,
        reasoning=self._format_reasoning(reasoning),
        action_candidates=self._format_candidates(action_candidates),
        action_name_candidates=self._format_action_names(action_name_candidates),
    )

    max_retries = 3
    for attempt in range(max_retries):
        decision = await self._prompt_for_decision(prompt, is_task=True)

        selected_action_name = decision.get("action_name", "")
        if selected_action_name == "":
            return decision

        selected_action = self.action_library.retrieve_action(selected_action_name)
        if selected_action is not None and _is_visible_in_mode(selected_action, GUI_mode):
            decision["parameters"] = self._ensure_parameters(decision.get("parameters"))
            return decision

        logger.warning(
            f"Received invalid action name '{selected_action_name}' during selection attempt {attempt + 1}"
        )

    # 3. If we fail to find a valid action name after the retries, raise an error
    raise ValueError("Invalid selected action returned by LLM after retries.")