Skip to content

core / task / task_manager

core.task.task_manager

TaskManager

Source code in core\task\task_manager.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
class TaskManager:
    def __init__(
        self,
        task_planner: TaskPlanner,
        triggers: TriggerQueue,
        db_interface: DatabaseInterface,
        event_stream_manager: EventStreamManager,
        state_manager: StateManager,
    ):
        """
        Coordinate task lifecycle management, including planning, execution,
        persistence, and event logging.

        The manager keeps an in-memory map of active :class:`Task` objects,
        persists changes to the database, synchronizes the state manager, and
        pushes triggers to the runtime queue to drive execution.

        Args:
            task_planner: Planner responsible for generating and updating step
                plans from high-level instructions.
            triggers: Queue used to schedule next actions for execution.
            db_interface: Persistence layer for task and step status updates.
            event_stream_manager: Event stream hub for user-visible progress
                logging.
            state_manager: In-memory state tracker for sharing task context
                with other components.
        """
        self.task_planner = task_planner
        self.triggers = triggers
        self.db_interface = db_interface
        self.event_stream_manager = event_stream_manager
        self.state_manager = state_manager
        self.active: Optional[Task] = None
        self.workspace_root = Path(AGENT_WORKSPACE_ROOT)

    def reset(self) -> None:
        """Clear all active tasks and detach any session-linked state."""
        self.active = None

    # ─────────────────────── Creation ─────────────────────────────────
    async def create_task(self, task_name: str, task_instruction: str) -> str:
        """
        Generate a new task plan and register it as active.

        The planner is invoked to break down the requested task into steps. A
        temporary workspace is provisioned, the plan is normalized into a
        :class:`Task`, and the result is recorded in the database and event
        stream. If planning fails, a minimal placeholder step is created so the
        task can still be surfaced.

        Args:
            task_name: Human-readable identifier supplied by the caller.
            task_instruction: Free-form description of the work to be
                completed.

        Returns:
            The unique identifier assigned to the created task.
        """
        task_id = f"{task_name}_{uuid.uuid4().hex[:6]}"
        task_id = self._sanitize_task_id(task_id)
        plan_json = await self.task_planner.plan_task(task_name, task_instruction)
        temp_dir = self._prepare_task_temp_dir(task_id)

        raw_task: Dict[str, Any] = {}
        try:
            raw_task = json.loads(plan_json)
            raw_steps = raw_task.get("steps")
            if not isinstance(raw_steps, list):
                raise ValueError("plan must be list")
            steps: List[Step] = [
                Step(
                    step_index=i,
                    step_name=st.get("step_name", "Default name value"),
                    description=st.get("description", "Default description value"),
                    action_instruction=st.get("action_instruction", ""),
                    validation_instruction=st.get("validation_instruction", ""),
                    status=st.get("status", "pending"),
                    failure_message=st.get("failure_message"),
                )
                for i, st in enumerate(raw_steps)
            ]
        except Exception as e:
            logger.error(f"[TaskManager] invalid plan – {e}")
            raw_task = {
                "goal": None,
                "inputs_params": None,
                "context": None,
            }
            steps = [Step(step_index=0, step_name="Send Message", description="Plan generation failed", action_instruction="", validation_instruction="", status="failed")]

        # Ensure a current step exists
        if steps and all(s.status != "current" for s in steps):
            first_pending = next((s for s in steps if s.status == "pending"), None)
            if first_pending:
                first_pending.status = "current"

        wf = Task(
            id=task_id,
            name=task_name,
            instruction=task_instruction,
            goal=raw_task.get("goal"),
            inputs_params=raw_task.get("inputs_params"),
            context=raw_task.get("context"),
            steps=steps,
            temp_dir=str(temp_dir),
        )
        self.active = wf
        self.db_interface.log_task(wf)
        self._sync_state_manager(wf)
        logger.debug(f"[TaskManager] Task {task_id} with {len(steps)} steps created")

        self.event_stream_manager.event_stream.temp_dir=temp_dir

        logger.debug("LOGGGING TO EVENT STREAM")
        self.event_stream_manager.log(
            "task",
            f"Created task: '{task_name}' with instruction: '{task_instruction}'.",
            display_message=f"Task created → {task_name}",
        )

        return task_id

    # ─────────────────────── Public: plan update helper ───────────────────────
    async def update_task_plan(
        self,
        event_stream: str,
        advance_next: bool = False,
    ) -> Tuple[Optional[str], Optional[Step]]:
        wf = self.active
        if not wf:
            logger.warning(f"[TaskManager] No active task found")
            return None, None

        updated_plan_json = await self.task_planner.update_plan(
            task_instruction=wf.instruction,
            task_plan=wf,
            event_stream=event_stream,
            advance_next=advance_next,
        )

        raw_task: Dict[str, Any] = {}
        try:
            raw_task = json.loads(updated_plan_json)
            raw_steps = raw_task.get("steps")
            if not isinstance(raw_steps, list):
                raise ValueError("plan must be list")
            steps: List[Step] = [
                Step(
                    step_index=i,
                    step_name=st.get("step_name", "Default name value"),
                    description=st.get("description", "Default description value"),
                    action_instruction=st.get("action_instruction", ""),
                    validation_instruction=st.get("validation_instruction", ""),
                    status=st.get("status", "pending"),
                    failure_message=st.get("failure_message"),
                )
                for i, st in enumerate(raw_steps)
            ]
        except Exception as e:
            logger.error(f"[TaskManager] invalid plan – {e}")
            raw_task = {
                "goal": None,
                "inputs_params": None,
                "context": None,
            }
            steps = [Step(step_index=0, step_name="chat", description="Plan generation failed", action_instruction="", validation_instruction="", status="failed")]

        # Ensure a current step exists
        if steps and all(s.status != "current" for s in steps):
            first_pending = next((s for s in steps if s.status == "pending"), None)
            if first_pending:
                first_pending.status = "current"

        updated_wf = Task(
            id=wf.id,
            name=wf.name,
            instruction=wf.instruction,
            goal=raw_task.get("goal"),
            inputs_params=raw_task.get("inputs_params"),
            context=raw_task.get("context"),
            steps=steps,
            temp_dir=str(wf.temp_dir),
        )
        self.active = updated_wf
        self.db_interface.log_task(updated_wf)
        self._sync_state_manager(updated_wf)
        logger.debug(f"[TaskManager] Task {wf.id} with {len(steps)} steps created")

        new_current_step = next((s for s in wf.steps if s.status == "current"), None)

        if new_current_step:
            if not new_current_step.action_id:
                new_current_step.action_id = str(uuid.uuid4())
            self.event_stream_manager.log(
                "task",
                f"Running new step: '{new_current_step.step_name}' – {new_current_step.description}",
                display_message=f"Running new step: '{new_current_step.step_name}' – {new_current_step.description}",
            )
            self.db_interface.log_task(wf)
            self._sync_state_manager(wf)

        return new_current_step

    # ─────────────────────── Start execution ──────────────────────────────────
    async def start_task(self) -> Dict[str, Any]:
        wf = self.active
        if not wf:
            return {"error": "task_not_found"}

        step = await self._ensure_and_log_current_step(wf)
        if not step:
            return {"error": "no_current_step"}

        await self.triggers.put(
            Trigger(
                fire_at=time.time(),
                priority=5,
                next_action_description=step.description,
                session_id=wf.id,
                payload={
                    "parent_action_id": step.action_id,
                },
            )
        )

        self.event_stream_manager.log(
            "task", 
            f"Running task step: '{step.step_name}' – {step.description}",
            display_message=f"Running task step: '{step.step_name}' – {step.description}"
        )
        logger.debug(f"[TaskManager] Step {step.step_name} queued ({wf.id})")
        return {"status": "queued", "step": step.step_name}

    # ─────────────────────── New tool-able controls ───────────────────────────
    async def mark_task_completed(self, message: Optional[str] = None) -> bool:
        wf = self.active
        if not wf:
            return False
        # finalize current step as completed if it's still open
        await self._finalize_current_step(wf, terminal_status="completed", message=message)
        await self._end_task(wf, status="completed", note=message)
        return True

    async def mark_task_error(self, message: Optional[str] = None) -> bool:
        wf = self.active
        if not wf:
            return False
        await self._finalize_current_step(wf, terminal_status="failed", message=message)
        await self._end_task(wf, status="error", note=message)
        return True

    async def mark_task_cancel(self, reason: Optional[str] = None) -> bool:
        wf = self.active
        if not wf:
            return False
        # mark all non-terminal steps as cancelled
        for st in wf.steps:
            if st.status in ("pending", "current"):
                st.status = "cancelled"
        await self._finalize_current_step(wf, terminal_status="cancelled", message=reason)
        await self._end_task(wf, status="cancelled", note=reason)
        return True

    async def start_next_step(
        self,
        replan: bool = False,
    ) -> Dict[str, Any]:
        """
        Finalize the current step as 'completed' and move to the next step.
        If replan=True, ask the planner to update the plan and advance; the
        resulting 'current' step (which may be newly created) will be used.

        Args:
            task_id: Identifier of the active task being advanced.
            replan: Whether to request a fresh plan update before selecting the
                next step.

        Returns:
            A status payload describing the next action (queued, completed, or
            no_next_step) or an error if the task is unknown.
        """
        wf = self.active
        if not wf:
            return {"error": "task_not_found"}

        # 1) finalize the current step as completed
        await self._finalize_current_step(wf, terminal_status="completed")

        new_current: Optional[Step] = None

        if replan:
            # 2a) replan and ask to advance
            new_current = await self.update_task_plan(
                event_stream="",           # no external event payload at this entry point
                advance_next=True,         # explicit baton move
            )
        else:
            # 2b) no replan: promote the next pending step
            #    (first non-terminal step after the previously current one)
            new_current = next((s for s in wf.steps if s.status == "pending"), None)
            if new_current:
                new_current.status = "current"

        # 3) if no new current, we may be done
        if not new_current:
            # If the last step is terminal and last is completed/skipped, we can auto-complete
            if wf.steps and wf.steps[-1].status in ("completed", "skipped"):
                await self._end_task(wf, status="completed", note="Auto-completed after last step")
                return {"status": "completed"}
            return {"status": "no_next_step"}

        # 4) ensure action row and enqueue trigger
        await self._ensure_and_log_current_step(wf)
        self.db_interface.log_task(wf)
        self._sync_state_manager(wf)
        STATE.set_agent_property("current_step_index", new_current.step_index)
        return {"status": "queued", "step": new_current.step_name}

    # ─────────────────────── Internal helpers ─────────────────────────────────
    async def _ensure_and_log_current_step(self, wf: Task) -> Optional[Step]:
        """Ensure there is a current step, promote the first pending if needed, and persist the change."""
        step = wf.get_current_step()
        if not step:
            return None

        updated = False
        if step.status != "current":
            step.status = "current"
            updated = True
        if not step.action_id:
            step.action_id = str(uuid.uuid4())
            updated = True

        if updated:
            self.db_interface.log_task(wf)
        self._sync_state_manager(wf)
        return step

    async def _finalize_current_step(self, wf: Task, terminal_status: str, message: Optional[str] = None) -> None:
        step = next((s for s in wf.steps if s.status == "current"), None)
        if not step:
            return
        step.status = terminal_status
        if message:
            step.failure_message = message
        self.db_interface.log_task(wf)
        self._sync_state_manager(wf)

    async def _end_task(self, wf: Task, status: str, note: Optional[str]) -> None:
        wf.status = status
        self.db_interface.log_task(wf)
        self._sync_state_manager(wf)
        self.event_stream_manager.log(
            "task",
            f"Task ended with status '{status}'. {note or ''}",
            display_message=f"Task {wf.name}{status}",
        )
        STATE.set_agent_property("current_task_id", "")
        STATE.set_agent_property("action_count", 0)
        STATE.set_agent_property("token_count", 0)
        # purge any queued triggers for the session
        try:
            await self.triggers.remove_sessions([wf.id])
        except Exception:
            logger.warning(f"[TaskManager] Failed to purge triggers for {wf.id}")
        # remove from active memory
        self.active = None
        if self.state_manager:
            self.state_manager.remove_active_task()
        if status == "completed":
            self._cleanup_task_temp_dir(wf)

    def get_task(self) -> Optional[Task]:
        return self.active

    def _sync_state_manager(self, wf: Optional[Task]) -> None:
        if not self.state_manager:
            return
        self.state_manager.add_to_active_task(task=wf)

    def _prepare_task_temp_dir(self, task_id: str) -> Path:
        temp_root = self.workspace_root / "tmp"
        temp_root.mkdir(parents=True, exist_ok=True)
        task_temp_dir = temp_root / task_id
        task_temp_dir.mkdir(parents=True, exist_ok=True)
        return task_temp_dir

    def _cleanup_task_temp_dir(self, wf: Task) -> None:
        if not wf.temp_dir:
            return
        try:
            shutil.rmtree(wf.temp_dir, ignore_errors=True)
            logger.debug("[TaskManager] Cleaned up temp dir for task %s", wf.id)
        except Exception:
            logger.warning("[TaskManager] Failed to clean temp dir for %s", wf.id, exc_info=True)

    # ─────────────────────── helper function ───────────────────────────────────
    def _sanitize_task_id(self, s: str) -> str:
        s = s.strip()
        s = re.sub(r"[^A-Za-z0-9._-]+", "_", s)  # replace invalid chars with _
        s = re.sub(r"_+", "_", s)               # collapse repeats
        return s.strip("._-") or "task"

__init__(task_planner, triggers, db_interface, event_stream_manager, state_manager)

Coordinate task lifecycle management, including planning, execution, persistence, and event logging.

The manager keeps an in-memory map of active :class:Task objects, persists changes to the database, synchronizes the state manager, and pushes triggers to the runtime queue to drive execution.

Parameters:

Name Type Description Default
task_planner TaskPlanner

Planner responsible for generating and updating step plans from high-level instructions.

required
triggers TriggerQueue

Queue used to schedule next actions for execution.

required
db_interface DatabaseInterface

Persistence layer for task and step status updates.

required
event_stream_manager EventStreamManager

Event stream hub for user-visible progress logging.

required
state_manager StateManager

In-memory state tracker for sharing task context with other components.

required
Source code in core\task\task_manager.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
def __init__(
    self,
    task_planner: TaskPlanner,
    triggers: TriggerQueue,
    db_interface: DatabaseInterface,
    event_stream_manager: EventStreamManager,
    state_manager: StateManager,
):
    """
    Coordinate task lifecycle management, including planning, execution,
    persistence, and event logging.

    The manager keeps an in-memory map of active :class:`Task` objects,
    persists changes to the database, synchronizes the state manager, and
    pushes triggers to the runtime queue to drive execution.

    Args:
        task_planner: Planner responsible for generating and updating step
            plans from high-level instructions.
        triggers: Queue used to schedule next actions for execution.
        db_interface: Persistence layer for task and step status updates.
        event_stream_manager: Event stream hub for user-visible progress
            logging.
        state_manager: In-memory state tracker for sharing task context
            with other components.
    """
    self.task_planner = task_planner
    self.triggers = triggers
    self.db_interface = db_interface
    self.event_stream_manager = event_stream_manager
    self.state_manager = state_manager
    self.active: Optional[Task] = None
    self.workspace_root = Path(AGENT_WORKSPACE_ROOT)

reset()

Clear all active tasks and detach any session-linked state.

Source code in core\task\task_manager.py
53
54
55
def reset(self) -> None:
    """Clear all active tasks and detach any session-linked state."""
    self.active = None

create_task(task_name, task_instruction) async

Generate a new task plan and register it as active.

The planner is invoked to break down the requested task into steps. A temporary workspace is provisioned, the plan is normalized into a :class:Task, and the result is recorded in the database and event stream. If planning fails, a minimal placeholder step is created so the task can still be surfaced.

Parameters:

Name Type Description Default
task_name str

Human-readable identifier supplied by the caller.

required
task_instruction str

Free-form description of the work to be completed.

required

Returns:

Type Description
str

The unique identifier assigned to the created task.

Source code in core\task\task_manager.py
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
async def create_task(self, task_name: str, task_instruction: str) -> str:
    """
    Generate a new task plan and register it as active.

    The planner is invoked to break down the requested task into steps. A
    temporary workspace is provisioned, the plan is normalized into a
    :class:`Task`, and the result is recorded in the database and event
    stream. If planning fails, a minimal placeholder step is created so the
    task can still be surfaced.

    Args:
        task_name: Human-readable identifier supplied by the caller.
        task_instruction: Free-form description of the work to be
            completed.

    Returns:
        The unique identifier assigned to the created task.
    """
    task_id = f"{task_name}_{uuid.uuid4().hex[:6]}"
    task_id = self._sanitize_task_id(task_id)
    plan_json = await self.task_planner.plan_task(task_name, task_instruction)
    temp_dir = self._prepare_task_temp_dir(task_id)

    raw_task: Dict[str, Any] = {}
    try:
        raw_task = json.loads(plan_json)
        raw_steps = raw_task.get("steps")
        if not isinstance(raw_steps, list):
            raise ValueError("plan must be list")
        steps: List[Step] = [
            Step(
                step_index=i,
                step_name=st.get("step_name", "Default name value"),
                description=st.get("description", "Default description value"),
                action_instruction=st.get("action_instruction", ""),
                validation_instruction=st.get("validation_instruction", ""),
                status=st.get("status", "pending"),
                failure_message=st.get("failure_message"),
            )
            for i, st in enumerate(raw_steps)
        ]
    except Exception as e:
        logger.error(f"[TaskManager] invalid plan – {e}")
        raw_task = {
            "goal": None,
            "inputs_params": None,
            "context": None,
        }
        steps = [Step(step_index=0, step_name="Send Message", description="Plan generation failed", action_instruction="", validation_instruction="", status="failed")]

    # Ensure a current step exists
    if steps and all(s.status != "current" for s in steps):
        first_pending = next((s for s in steps if s.status == "pending"), None)
        if first_pending:
            first_pending.status = "current"

    wf = Task(
        id=task_id,
        name=task_name,
        instruction=task_instruction,
        goal=raw_task.get("goal"),
        inputs_params=raw_task.get("inputs_params"),
        context=raw_task.get("context"),
        steps=steps,
        temp_dir=str(temp_dir),
    )
    self.active = wf
    self.db_interface.log_task(wf)
    self._sync_state_manager(wf)
    logger.debug(f"[TaskManager] Task {task_id} with {len(steps)} steps created")

    self.event_stream_manager.event_stream.temp_dir=temp_dir

    logger.debug("LOGGGING TO EVENT STREAM")
    self.event_stream_manager.log(
        "task",
        f"Created task: '{task_name}' with instruction: '{task_instruction}'.",
        display_message=f"Task created → {task_name}",
    )

    return task_id

start_next_step(replan=False) async

Finalize the current step as 'completed' and move to the next step. If replan=True, ask the planner to update the plan and advance; the resulting 'current' step (which may be newly created) will be used.

Parameters:

Name Type Description Default
task_id

Identifier of the active task being advanced.

required
replan bool

Whether to request a fresh plan update before selecting the next step.

False

Returns:

Type Description
Dict[str, Any]

A status payload describing the next action (queued, completed, or

Dict[str, Any]

no_next_step) or an error if the task is unknown.

Source code in core\task\task_manager.py
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
async def start_next_step(
    self,
    replan: bool = False,
) -> Dict[str, Any]:
    """
    Finalize the current step as 'completed' and move to the next step.
    If replan=True, ask the planner to update the plan and advance; the
    resulting 'current' step (which may be newly created) will be used.

    Args:
        task_id: Identifier of the active task being advanced.
        replan: Whether to request a fresh plan update before selecting the
            next step.

    Returns:
        A status payload describing the next action (queued, completed, or
        no_next_step) or an error if the task is unknown.
    """
    wf = self.active
    if not wf:
        return {"error": "task_not_found"}

    # 1) finalize the current step as completed
    await self._finalize_current_step(wf, terminal_status="completed")

    new_current: Optional[Step] = None

    if replan:
        # 2a) replan and ask to advance
        new_current = await self.update_task_plan(
            event_stream="",           # no external event payload at this entry point
            advance_next=True,         # explicit baton move
        )
    else:
        # 2b) no replan: promote the next pending step
        #    (first non-terminal step after the previously current one)
        new_current = next((s for s in wf.steps if s.status == "pending"), None)
        if new_current:
            new_current.status = "current"

    # 3) if no new current, we may be done
    if not new_current:
        # If the last step is terminal and last is completed/skipped, we can auto-complete
        if wf.steps and wf.steps[-1].status in ("completed", "skipped"):
            await self._end_task(wf, status="completed", note="Auto-completed after last step")
            return {"status": "completed"}
        return {"status": "no_next_step"}

    # 4) ensure action row and enqueue trigger
    await self._ensure_and_log_current_step(wf)
    self.db_interface.log_task(wf)
    self._sync_state_manager(wf)
    STATE.set_agent_property("current_step_index", new_current.step_index)
    return {"status": "queued", "step": new_current.step_name}