Skip to content

core / database_interface

core.database_interface

core.database_interface

A filesystem backed storage layer (plus ChromaDB) so the rest of the codebase never talks to persistence details directly.

DatabaseInterface

All persistence operations for the agent live here.

Source code in core\database_interface.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
class DatabaseInterface:
    """All persistence operations for the agent live here."""

    def __init__(
        self,
        *,
        data_dir: str = "core/data",
        chroma_path: str = "./chroma_db",
        log_file: Optional[str] = None,
    ) -> None:
        """
        Initialize storage directories and vector stores for agent data.

        The constructor sets up filesystem paths for logs, actions, task
        documents, and agent info. It also initializes separate Chroma
        collections for actions and task documents, ensuring they are synced
        with existing files on startup.

        Args:
            data_dir: Base directory used to persist logs and JSON artifacts.
            chroma_path: Root path for ChromaDB persistence; distinct suffixes
                are used for actions and task documents.
            log_file: Optional explicit log file path; defaults to
                ``<data_dir>/agent_logs.txt`` when omitted.
        """
        self.data_dir = Path(data_dir)
        self.data_dir.mkdir(parents=True, exist_ok=True)

        self.log_file_path = Path(log_file) if log_file else self.data_dir / "agent_logs.txt"
        self.actions_dir = self.data_dir / "action"
        self.task_docs_dir = self.data_dir / "task_document"
        self.agent_info_path = self.data_dir / "agent_info.json"


        self.actions_dir.mkdir(parents=True, exist_ok=True)
        self.task_docs_dir.mkdir(parents=True, exist_ok=True)
        self.log_file_path.touch(exist_ok=True)
        if not self.agent_info_path.exists():
            self.agent_info_path.write_text("{}", encoding="utf-8")


        # ChromaDB (for vector search on actions and task documents)
        self.chroma = chromadb.PersistentClient(path=f"{chroma_path}_actions")
        self.chroma_actions = self.chroma.get_or_create_collection("agent_actions")

        # separate ChromaDB client/collection for task documents
        self.chroma_taskdocs = chromadb.PersistentClient(path=f"{chroma_path}_taskdocs")
        self.chroma_taskdocs_coll = self.chroma_taskdocs.get_or_create_collection("task_documents")

        # Ensure Chroma stays in sync with the filesystem sources on startup
        self.sync_actions_to_chroma(paths_to_scan=[self.actions_dir])

        # Retrieve everything currently in the collection
        stored_data = self.chroma_actions.get()
        stored_ids = stored_data.get("ids", [])
        count = len(stored_ids)

        if count > 0:
            # Create a readable comma-separated string of action names
            actions_list_str = ", ".join(sorted(stored_ids))
            logger.info(f"✅ ChromaDB sync successful. Collection now holds {count} actions: [{actions_list_str}]")
        else:
                logger.warning("⚠️ ChromaDB sync completed, but the collection is empty.")

        self.sync_task_documents_to_chroma()

    # ------------------------------------------------------------------
    # Log helpers
    # ------------------------------------------------------------------
    def _load_log_entries(self) -> List[Dict[str, Any]]:
        entries: List[Dict[str, Any]] = []
        try:
            with self.log_file_path.open("r", encoding="utf-8") as handle:
                for line in handle:
                    line = line.strip()
                    if not line:
                        continue
                    try:
                        entries.append(json.loads(line))
                    except json.JSONDecodeError:
                        logger.warning(f"[LOG PARSE] Skipping malformed line in {self.log_file_path}")
        except FileNotFoundError:
            pass
        return entries

    def _write_log_entries(self, entries: Iterable[Dict[str, Any]]) -> None:
        with self.log_file_path.open("w", encoding="utf-8") as handle:
            for entry in entries:
                handle.write(json.dumps(entry, default=str) + "\n")

    def _append_log_entry(self, entry: Dict[str, Any]) -> None:
        with self.log_file_path.open("a", encoding="utf-8") as handle:
            handle.write(json.dumps(entry, default=str) + "\n")

    # ------------------------------------------------------------------
    # Prompt logging & token usage helpers
    # ------------------------------------------------------------------
    def log_prompt(
        self,
        *,
        input_data: Dict[str, str],
        output: Optional[str],
        provider: str,
        model: str,
        config: Dict[str, Any],
        status: str,
        token_count_input: Optional[int] = None,
        token_count_output: Optional[int] = None,
    ) -> None:
        """
        Store a single prompt interaction with metadata and token counts.

        Each call appends a structured record to the log file so usage metrics
        and model behavior can be inspected later.

        Args:
            input_data: Serialized prompt inputs sent to the model provider.
            output: The raw model output string, if available.
            provider: Name of the LLM provider (e.g., OpenAI, Anthropic).
            model: Specific model identifier used for the request.
            config: Provider-specific configuration details for the call.
            status: Execution status for the prompt (e.g., ``"success"`` or
                ``"error"``).
            token_count_input: Optional token count for the prompt payload.
            token_count_output: Optional token count for the model response.
        """
        entry = {
            "entry_type": "prompt_log",
            "datetime": datetime.datetime.utcnow().isoformat(),
            "input": input_data,
            "output": output,
            "provider": provider,
            "model": model,
            "config": config,
            "status": status,
            "token_count_input": token_count_input,
            "token_count_output": token_count_output,
        }
        self._append_log_entry(entry)

    def _iter_prompt_logs(self) -> Iterable[Dict[str, Any]]:
        for entry in self._load_log_entries():
            if entry.get("entry_type") == "prompt_log":
                yield entry

    # ------------------------------------------------------------------
    # Action history logging
    # ------------------------------------------------------------------
    def upsert_action_history(
        self,
        run_id: str,
        *,
        session_id: str,
        parent_id: str | None,
        name: str,
        action_type: str,
        status: str,
        inputs: Dict[str, Any] | None,
        outputs: Dict[str, Any] | None,
        started_at: str | None,
        ended_at: str | None,
    ) -> None:
        """
        Insert or update an action execution history entry.

        The log is keyed by ``run_id``; repeated writes merge new details while
        preserving the initial ``startedAt`` value when absent.

        Args:
            run_id: Unique identifier for the action execution instance.
            session_id: Identifier for the session that triggered the action.
            parent_id: Optional run identifier for the parent action in a tree.
            name: Human-readable action name.
            action_type: Action type label; duplicated into ``type`` for
                backward compatibility.
            status: Current execution status.
            inputs: Serialized action inputs, if available.
            outputs: Serialized action outputs, if available.
            started_at: ISO timestamp for when execution began.
            ended_at: ISO timestamp for when execution completed.
        """
        entries = self._load_log_entries()
        payload = {
            "entry_type": "action_history",
            "runId": run_id,
            "sessionId": session_id,
            "parentId": parent_id,
            "name": name,
            "action_type": action_type,
            "type": action_type,
            "status": status,
            "inputs": inputs,
            "outputs": outputs,
            "startedAt": started_at,
            "endedAt": ended_at,
        }

        found = False
        for entry in entries:
            if entry.get("entry_type") == "action_history" and entry.get("runId") == run_id:
                entry["action_type"] = payload["action_type"]
                entry["type"] = payload["type"]
                entry.update({k: v for k, v in payload.items() if v is not None or k in {"inputs", "outputs"}})
                if entry.get("startedAt") is None:
                    entry["startedAt"] = started_at
                found = True
                break

        if not found:
            if payload["startedAt"] is None:
                payload["startedAt"] = datetime.datetime.utcnow().isoformat()
            entries.append(payload)

        self._write_log_entries(entries)

    def _iter_action_history(self) -> Iterable[Dict[str, Any]]:
        for entry in self._load_log_entries():
            if entry.get("entry_type") == "action_history":
                yield entry

    def find_actions_by_status(self, status: str) -> List[Dict[str, Any]]:
        """
        Return all action history entries matching the given status.

        Args:
            status: Status value to filter (e.g., ``"current"`` or ``"pending"``).

        Returns:
            List of action history dictionaries where ``status`` matches.
        """
        return [entry for entry in self._iter_action_history() if entry.get("status") == status]

    def get_action_history(self, limit: int = 10) -> List[Dict[str, Any]]:
        """
        Retrieve recent action history entries ordered by start time.

        Args:
            limit: Maximum number of entries to return, sorted newest-first.

        Returns:
            A list of action history dictionaries truncated to ``limit``
            entries.
        """
        history = list(self._iter_action_history())
        history.sort(
            key=lambda e: datetime.datetime.fromisoformat(e.get("startedAt") or datetime.datetime.min.isoformat()),
            reverse=True,
        )
        return history[:limit]

    # ------------------------------------------------------------------
    # Task logging helpers
    # ------------------------------------------------------------------
    def log_task(self, task: Task) -> None:
        """
        Persist or update a task log entry for tracking execution progress.

        The task is serialized to JSON-compatible primitives and either
        appended to the log or merged with an existing entry for the same task
        identifier.

        Args:
            task: The :class:`~core.task.task.Task` instance to record.
        """
        doc = {
            "entry_type": "task_log",
            "task_id": task.id,
            "name": task.name,
            "instruction": task.instruction,
            "steps": [asdict(step) for step in task.steps],
            "created_at": task.created_at,
            "status": task.status,
            "results": task.results,
            "updated_at": datetime.datetime.utcnow().isoformat(),
        }

        entries = self._load_log_entries()
        for entry in entries:
            if entry.get("entry_type") == "task_log" and entry.get("task_id") == task.id:
                entry.update(doc)
                break
        else:
            entries.append(doc)

        self._write_log_entries(entries)

    def _iter_task_logs(self) -> Iterable[Dict[str, Any]]:
        for entry in self._load_log_entries():
            if entry.get("entry_type") == "task_log":
                yield entry

    # ------------------------------------------------------------------
    # Action definitions (filesystem + Chroma)
    # ------------------------------------------------------------------
    def _sanitize_action_filename(self, name: str) -> str:
        sanitized = re.sub(r"[^A-Za-z0-9_-]", "_", name).strip("_") or "action"
        return f"{sanitized}.json"

    def _load_actions_from_disk(self) -> List[Dict[str, Any]]:
        actions: List[Dict[str, Any]] = []
        for path in self.actions_dir.glob("*.json"):
            try:
                actions.append(json.loads(path.read_text(encoding="utf-8")))
            except Exception as exc:
                logger.warning(f"[ACTION LOAD] Failed to read {path}: {exc}")
        return actions

    def store_action(self, action_dict: Dict[str, Any]) -> None:
        """
        Persist an action definition and refresh its vector index entry.

        Args:
            action_dict: Action payload to store, expected to include a ``name``
                field used for the filename and Chroma identifier.
        """
        action_dict["updatedAt"] = datetime.datetime.utcnow().isoformat()
        file_name = self._sanitize_action_filename(action_dict["name"])
        path = self.actions_dir / file_name
        path.write_text(json.dumps(action_dict, indent=2, default=str), encoding="utf-8")

        # keep Chroma in sync
        self.chroma_actions.delete(ids=[action_dict["name"]], ignore_missing=True)
        self.chroma_actions.add(
            ids=[action_dict["name"]],
            documents=[action_dict["name"]],
        )

    def list_actions(
        self,
        *,
        default: bool | None = None,
    ) -> List[Dict[str, Any]]:
        """
        Return stored actions optionally filtered by the ``default`` flag.

        Args:
            default: When provided, only return actions whose ``default`` field
                matches the boolean value.

        Returns:
            List of action dictionaries stored on disk that satisfy the filter.
        """
        actions = registry_instance.list_all_actions_as_json()

        if default is not None:
            actions = [action for action in actions if action.get("default") == default]

        return actions

    def get_action(self, name: str) -> Optional[Dict[str, Any]]:
        """
        Fetch a stored action by case-insensitive name match.

        Args:
            name: The human-readable name used to identify the action.

        Returns:
            The action dictionary when found, otherwise ``None``.
        """
        needle = name.lower()
        action = registry_instance.find_action_by_name(action_name=name)

        # if action is None:
        #     for action in self._load_actions_from_disk():
        #         if action.get("name", "").lower() == needle:
        #             return action
        return action

    def delete_action(self, name: str) -> None:
        """
        Remove an action definition from disk and its Chroma index.

        Args:
            name: Name of the action to delete.
        """
        for path in self.actions_dir.glob("*.json"):
            try:
                payload = json.loads(path.read_text(encoding="utf-8"))
            except Exception:
                continue
            if payload.get("name") == name:
                path.unlink(missing_ok=True)
                break
        self.chroma_actions.delete(ids=[name], ignore_missing=True)

    def search_actions(self, query: str, top_k: int = 7) -> List[str]:
        """
        Search stored actions using vector similarity on their names and description.

        Args:
            query: Freeform text used to locate similar action names and description.
            top_k: Maximum number of action identifiers to return.

        Returns:
            List of action names ranked by similarity to ``query``.
        """
        result = self.chroma_actions.query(
            query_texts=[query],
            n_results=top_k,
        )
        return result.get("ids", [[]])[0] if result else []

    def sync_actions_to_chroma(self, paths_to_scan: List[str] = None) -> int:
        """
        Build the Chroma action collection from JSON files on disk.

        Returns:
            Number of action definitions indexed in Chroma.
        """        
        load_actions_from_directories(paths_to_scan=paths_to_scan)

        actions: List[Dict[str, Any]] = registry_instance.list_all_actions_as_json()

        try:
            existing = self.chroma_actions.get()
            ids = existing.get("ids", []) if existing else []
            if ids:
                self.chroma_actions.delete(ids=list(ids))
        except Exception:
            pass

        ids: List[str] = []
        documents: List[str] = []
        for action in actions:
            name = action.get("name")
            if not name:
                continue
            ids.append(name)
            documents.append(name)

        if not ids:
            return 0

        self.chroma_actions.add(ids=ids, documents=documents)
        return len(ids)

    # ------------------------------------------------------------------
    # Agent configuration
    # ------------------------------------------------------------------
    def set_agent_info(self, info: Dict[str, Any], key: str = "singleton") -> None:
        """
        Persist arbitrary agent configuration under the provided key.

        Args:
            info: Mapping of configuration fields to store.
            key: Logical namespace under which the configuration is saved.
        """        
        try:
            existing = json.loads(self.agent_info_path.read_text(encoding="utf-8"))
        except Exception:
            existing = {}
        existing[key] = {**existing.get(key, {}), **info}
        self.agent_info_path.write_text(json.dumps(existing, indent=2), encoding="utf-8")

    def get_agent_info(self, key: str = "singleton") -> Optional[Dict[str, Any]]:
        """
        Load persisted agent configuration for the given key.

        Args:
            key: Namespace key used when persisting the configuration.

        Returns:
            A configuration dictionary when present, otherwise ``None``.
        """        
        try:
            info = json.loads(self.agent_info_path.read_text(encoding="utf-8"))
        except Exception:
            return None
        return info.get(key)

    # ------------------------------------------------------------------
    # Task documents (filesystem + Chroma)
    # ------------------------------------------------------------------
    def _extract_task_document_metadata(self, raw_text: str, fallback_name: str) -> tuple[str, str]:
        name: Optional[str] = None
        description: Optional[str] = None
        for line in raw_text.splitlines():
            stripped = line.strip()
            if not stripped:
                continue
            lowered = stripped.lower()
            if lowered.startswith("name:") and not name:
                name = stripped.split(":", 1)[1].strip() or None
            elif lowered.startswith("description:") and not description:
                description = stripped.split(":", 1)[1].strip() or None
            if name and description:
                break

        if not name:
            name = fallback_name
        if not description:
            first_para = next((blk.strip() for blk in raw_text.split("\n\n") if blk.strip()), "")
            description = first_para[:400]
        return name, description

    def _load_task_documents_from_disk(self) -> List[Dict[str, Any]]:
        docs: List[Dict[str, Any]] = []
        for path in sorted(self.task_docs_dir.glob("*.txt")):
            try:
                raw_text = path.read_text(encoding="utf-8")
            except Exception as exc:
                logger.warning(f"[TASKDOC LOAD] Failed to read {path}: {exc}")
                continue

            name, description = self._extract_task_document_metadata(raw_text, path.stem)
            docs.append(
                {
                    "task_id": path.stem,
                    "name": name,
                    "description": description,
                    "raw_text": raw_text,
                    "source_path": str(path),
                }
            )
        return docs

    def sync_task_documents_to_chroma(self) -> int:
        """
        Rebuild the Chroma collection from task document text files.

        Returns:
            Number of task documents indexed in Chroma after the sync.
        """        
        docs = self._load_task_documents_from_disk()
        try:
            existing = self.chroma_taskdocs_coll.get()
            ids = existing.get("ids", []) if existing else []
            if ids:
                self.chroma_taskdocs_coll.delete(ids=list(ids))
        except Exception:
            pass

        if not docs:
            return 0

        ids: List[str] = []
        documents: List[str] = []
        metadatas: List[Dict[str, Any]] = []
        for doc in docs:
            ids.append(doc["task_id"])
            documents.append(f"{doc['name']}\n\n{doc['description']}")
            metadatas.append({"name": doc["name"]})

        self.chroma_taskdocs_coll.add(ids=ids, documents=documents, metadatas=metadatas)
        return len(ids)

    def retrieve_similar_task_documents(self, query: str, top_k: int = 5) -> List[Dict[str, Any]]:
        """
        Return task documents ranked by similarity to the query text.

        Args:
            query: Text prompt used for semantic similarity search.
            top_k: Maximum number of documents to return.

        Returns:
            Ordered list of task document dictionaries in descending similarity
            rank.
        """        
        if not query or not query.strip():
            logger.debug("[NO QUERY FOUND]")
            return []

        result = self.chroma_taskdocs_coll.query(
            query_texts=[query],
            n_results=top_k,
        )

        ids = result.get("ids", [[]])[0] if result else []
        if not ids:
            return []

        docs_by_id = {doc["task_id"]: doc for doc in self._load_task_documents_from_disk()}
        docs: List[Dict[str, Any]] = []
        for doc_id in ids:
            doc = docs_by_id.get(doc_id)
            if doc:
                docs.append(doc)

        order = {tid: i for i, tid in enumerate(ids)}
        docs.sort(key=lambda d: order.get(d.get("task_id", ""), 10**9))
        return docs

    def get_task_document_texts(self, query: str, top_k: int = 3) -> List[str]:
        """
        Fetch raw task document texts based on similarity search results.

        Args:
            query: Freeform text query used to find related documents.
            top_k: Maximum number of raw documents to return.

        Returns:
            List of raw task document strings ordered by similarity.
        """        
        matches = self.retrieve_similar_task_documents(query, top_k=top_k)
        return [m.get("raw_text", "") for m in matches]

    # ------------------------------------------------------------------
    # Task helpers (for recovery)
    # ------------------------------------------------------------------
    def find_current_task_steps(self) -> List[Dict[str, Any]]:
        """
        List steps across all tasks that are marked as ``current``.
        Ideally, it should only return one step. However, there might be case 
        where multple steps are assigned with ``current`` due to LLM error.

        Returns:
            A list of dictionaries pairing ``task_id`` with the active step
            metadata.
        """        
        results: List[Dict[str, Any]] = []
        for entry in self._iter_task_logs():
            task_id = entry.get("task_id")
            for step in entry.get("steps", []):
                if step.get("status") == "current":
                    results.append({"task_id": task_id, "step": step})
        return results

    def update_step_status(
        self,
        task_id: str,
        action_id: str,
        status: str,
        failure_message: Optional[str] = None,
    ) -> None:
        """
        Update the status of a task step and persist the change.

        Args:
            task_id: Identifier for the task owning the step.
            action_id: The step's action identifier used to locate it.
            status: New status string to assign to the step.
            failure_message: Optional failure detail to attach when updating.
        """        
        entries = self._load_log_entries()
        updated = False
        for entry in entries:
            if entry.get("entry_type") != "task_log" or entry.get("task_id") != task_id:
                continue
            for step in entry.get("steps", []):
                if step.get("action_id") == action_id:
                    step["status"] = status
                    if failure_message is not None:
                        step["failure_message"] = failure_message
                    updated = True
                    break
            if updated:
                entry["updated_at"] = datetime.datetime.utcnow().isoformat()
                break
        if updated:
            self._write_log_entries(entries)

__init__(*, data_dir='core/data', chroma_path='./chroma_db', log_file=None)

Initialize storage directories and vector stores for agent data.

The constructor sets up filesystem paths for logs, actions, task documents, and agent info. It also initializes separate Chroma collections for actions and task documents, ensuring they are synced with existing files on startup.

Parameters:

Name Type Description Default
data_dir str

Base directory used to persist logs and JSON artifacts.

'core/data'
chroma_path str

Root path for ChromaDB persistence; distinct suffixes are used for actions and task documents.

'./chroma_db'
log_file Optional[str]

Optional explicit log file path; defaults to <data_dir>/agent_logs.txt when omitted.

None
Source code in core\database_interface.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
def __init__(
    self,
    *,
    data_dir: str = "core/data",
    chroma_path: str = "./chroma_db",
    log_file: Optional[str] = None,
) -> None:
    """
    Initialize storage directories and vector stores for agent data.

    The constructor sets up filesystem paths for logs, actions, task
    documents, and agent info. It also initializes separate Chroma
    collections for actions and task documents, ensuring they are synced
    with existing files on startup.

    Args:
        data_dir: Base directory used to persist logs and JSON artifacts.
        chroma_path: Root path for ChromaDB persistence; distinct suffixes
            are used for actions and task documents.
        log_file: Optional explicit log file path; defaults to
            ``<data_dir>/agent_logs.txt`` when omitted.
    """
    self.data_dir = Path(data_dir)
    self.data_dir.mkdir(parents=True, exist_ok=True)

    self.log_file_path = Path(log_file) if log_file else self.data_dir / "agent_logs.txt"
    self.actions_dir = self.data_dir / "action"
    self.task_docs_dir = self.data_dir / "task_document"
    self.agent_info_path = self.data_dir / "agent_info.json"


    self.actions_dir.mkdir(parents=True, exist_ok=True)
    self.task_docs_dir.mkdir(parents=True, exist_ok=True)
    self.log_file_path.touch(exist_ok=True)
    if not self.agent_info_path.exists():
        self.agent_info_path.write_text("{}", encoding="utf-8")


    # ChromaDB (for vector search on actions and task documents)
    self.chroma = chromadb.PersistentClient(path=f"{chroma_path}_actions")
    self.chroma_actions = self.chroma.get_or_create_collection("agent_actions")

    # separate ChromaDB client/collection for task documents
    self.chroma_taskdocs = chromadb.PersistentClient(path=f"{chroma_path}_taskdocs")
    self.chroma_taskdocs_coll = self.chroma_taskdocs.get_or_create_collection("task_documents")

    # Ensure Chroma stays in sync with the filesystem sources on startup
    self.sync_actions_to_chroma(paths_to_scan=[self.actions_dir])

    # Retrieve everything currently in the collection
    stored_data = self.chroma_actions.get()
    stored_ids = stored_data.get("ids", [])
    count = len(stored_ids)

    if count > 0:
        # Create a readable comma-separated string of action names
        actions_list_str = ", ".join(sorted(stored_ids))
        logger.info(f"✅ ChromaDB sync successful. Collection now holds {count} actions: [{actions_list_str}]")
    else:
            logger.warning("⚠️ ChromaDB sync completed, but the collection is empty.")

    self.sync_task_documents_to_chroma()

log_prompt(*, input_data, output, provider, model, config, status, token_count_input=None, token_count_output=None)

Store a single prompt interaction with metadata and token counts.

Each call appends a structured record to the log file so usage metrics and model behavior can be inspected later.

Parameters:

Name Type Description Default
input_data Dict[str, str]

Serialized prompt inputs sent to the model provider.

required
output Optional[str]

The raw model output string, if available.

required
provider str

Name of the LLM provider (e.g., OpenAI, Anthropic).

required
model str

Specific model identifier used for the request.

required
config Dict[str, Any]

Provider-specific configuration details for the call.

required
status str

Execution status for the prompt (e.g., "success" or "error").

required
token_count_input Optional[int]

Optional token count for the prompt payload.

None
token_count_output Optional[int]

Optional token count for the model response.

None
Source code in core\database_interface.py
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
def log_prompt(
    self,
    *,
    input_data: Dict[str, str],
    output: Optional[str],
    provider: str,
    model: str,
    config: Dict[str, Any],
    status: str,
    token_count_input: Optional[int] = None,
    token_count_output: Optional[int] = None,
) -> None:
    """
    Store a single prompt interaction with metadata and token counts.

    Each call appends a structured record to the log file so usage metrics
    and model behavior can be inspected later.

    Args:
        input_data: Serialized prompt inputs sent to the model provider.
        output: The raw model output string, if available.
        provider: Name of the LLM provider (e.g., OpenAI, Anthropic).
        model: Specific model identifier used for the request.
        config: Provider-specific configuration details for the call.
        status: Execution status for the prompt (e.g., ``"success"`` or
            ``"error"``).
        token_count_input: Optional token count for the prompt payload.
        token_count_output: Optional token count for the model response.
    """
    entry = {
        "entry_type": "prompt_log",
        "datetime": datetime.datetime.utcnow().isoformat(),
        "input": input_data,
        "output": output,
        "provider": provider,
        "model": model,
        "config": config,
        "status": status,
        "token_count_input": token_count_input,
        "token_count_output": token_count_output,
    }
    self._append_log_entry(entry)

upsert_action_history(run_id, *, session_id, parent_id, name, action_type, status, inputs, outputs, started_at, ended_at)

Insert or update an action execution history entry.

The log is keyed by run_id; repeated writes merge new details while preserving the initial startedAt value when absent.

Parameters:

Name Type Description Default
run_id str

Unique identifier for the action execution instance.

required
session_id str

Identifier for the session that triggered the action.

required
parent_id str | None

Optional run identifier for the parent action in a tree.

required
name str

Human-readable action name.

required
action_type str

Action type label; duplicated into type for backward compatibility.

required
status str

Current execution status.

required
inputs Dict[str, Any] | None

Serialized action inputs, if available.

required
outputs Dict[str, Any] | None

Serialized action outputs, if available.

required
started_at str | None

ISO timestamp for when execution began.

required
ended_at str | None

ISO timestamp for when execution completed.

required
Source code in core\database_interface.py
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
def upsert_action_history(
    self,
    run_id: str,
    *,
    session_id: str,
    parent_id: str | None,
    name: str,
    action_type: str,
    status: str,
    inputs: Dict[str, Any] | None,
    outputs: Dict[str, Any] | None,
    started_at: str | None,
    ended_at: str | None,
) -> None:
    """
    Insert or update an action execution history entry.

    The log is keyed by ``run_id``; repeated writes merge new details while
    preserving the initial ``startedAt`` value when absent.

    Args:
        run_id: Unique identifier for the action execution instance.
        session_id: Identifier for the session that triggered the action.
        parent_id: Optional run identifier for the parent action in a tree.
        name: Human-readable action name.
        action_type: Action type label; duplicated into ``type`` for
            backward compatibility.
        status: Current execution status.
        inputs: Serialized action inputs, if available.
        outputs: Serialized action outputs, if available.
        started_at: ISO timestamp for when execution began.
        ended_at: ISO timestamp for when execution completed.
    """
    entries = self._load_log_entries()
    payload = {
        "entry_type": "action_history",
        "runId": run_id,
        "sessionId": session_id,
        "parentId": parent_id,
        "name": name,
        "action_type": action_type,
        "type": action_type,
        "status": status,
        "inputs": inputs,
        "outputs": outputs,
        "startedAt": started_at,
        "endedAt": ended_at,
    }

    found = False
    for entry in entries:
        if entry.get("entry_type") == "action_history" and entry.get("runId") == run_id:
            entry["action_type"] = payload["action_type"]
            entry["type"] = payload["type"]
            entry.update({k: v for k, v in payload.items() if v is not None or k in {"inputs", "outputs"}})
            if entry.get("startedAt") is None:
                entry["startedAt"] = started_at
            found = True
            break

    if not found:
        if payload["startedAt"] is None:
            payload["startedAt"] = datetime.datetime.utcnow().isoformat()
        entries.append(payload)

    self._write_log_entries(entries)

find_actions_by_status(status)

Return all action history entries matching the given status.

Parameters:

Name Type Description Default
status str

Status value to filter (e.g., "current" or "pending").

required

Returns:

Type Description
List[Dict[str, Any]]

List of action history dictionaries where status matches.

Source code in core\database_interface.py
247
248
249
250
251
252
253
254
255
256
257
def find_actions_by_status(self, status: str) -> List[Dict[str, Any]]:
    """
    Return all action history entries matching the given status.

    Args:
        status: Status value to filter (e.g., ``"current"`` or ``"pending"``).

    Returns:
        List of action history dictionaries where ``status`` matches.
    """
    return [entry for entry in self._iter_action_history() if entry.get("status") == status]

get_action_history(limit=10)

Retrieve recent action history entries ordered by start time.

Parameters:

Name Type Description Default
limit int

Maximum number of entries to return, sorted newest-first.

10

Returns:

Type Description
List[Dict[str, Any]]

A list of action history dictionaries truncated to limit

List[Dict[str, Any]]

entries.

Source code in core\database_interface.py
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
def get_action_history(self, limit: int = 10) -> List[Dict[str, Any]]:
    """
    Retrieve recent action history entries ordered by start time.

    Args:
        limit: Maximum number of entries to return, sorted newest-first.

    Returns:
        A list of action history dictionaries truncated to ``limit``
        entries.
    """
    history = list(self._iter_action_history())
    history.sort(
        key=lambda e: datetime.datetime.fromisoformat(e.get("startedAt") or datetime.datetime.min.isoformat()),
        reverse=True,
    )
    return history[:limit]

log_task(task)

Persist or update a task log entry for tracking execution progress.

The task is serialized to JSON-compatible primitives and either appended to the log or merged with an existing entry for the same task identifier.

Parameters:

Name Type Description Default
task Task

The :class:~core.task.task.Task instance to record.

required
Source code in core\database_interface.py
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
def log_task(self, task: Task) -> None:
    """
    Persist or update a task log entry for tracking execution progress.

    The task is serialized to JSON-compatible primitives and either
    appended to the log or merged with an existing entry for the same task
    identifier.

    Args:
        task: The :class:`~core.task.task.Task` instance to record.
    """
    doc = {
        "entry_type": "task_log",
        "task_id": task.id,
        "name": task.name,
        "instruction": task.instruction,
        "steps": [asdict(step) for step in task.steps],
        "created_at": task.created_at,
        "status": task.status,
        "results": task.results,
        "updated_at": datetime.datetime.utcnow().isoformat(),
    }

    entries = self._load_log_entries()
    for entry in entries:
        if entry.get("entry_type") == "task_log" and entry.get("task_id") == task.id:
            entry.update(doc)
            break
    else:
        entries.append(doc)

    self._write_log_entries(entries)

store_action(action_dict)

Persist an action definition and refresh its vector index entry.

Parameters:

Name Type Description Default
action_dict Dict[str, Any]

Action payload to store, expected to include a name field used for the filename and Chroma identifier.

required
Source code in core\database_interface.py
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
def store_action(self, action_dict: Dict[str, Any]) -> None:
    """
    Persist an action definition and refresh its vector index entry.

    Args:
        action_dict: Action payload to store, expected to include a ``name``
            field used for the filename and Chroma identifier.
    """
    action_dict["updatedAt"] = datetime.datetime.utcnow().isoformat()
    file_name = self._sanitize_action_filename(action_dict["name"])
    path = self.actions_dir / file_name
    path.write_text(json.dumps(action_dict, indent=2, default=str), encoding="utf-8")

    # keep Chroma in sync
    self.chroma_actions.delete(ids=[action_dict["name"]], ignore_missing=True)
    self.chroma_actions.add(
        ids=[action_dict["name"]],
        documents=[action_dict["name"]],
    )

list_actions(*, default=None)

Return stored actions optionally filtered by the default flag.

Parameters:

Name Type Description Default
default bool | None

When provided, only return actions whose default field matches the boolean value.

None

Returns:

Type Description
List[Dict[str, Any]]

List of action dictionaries stored on disk that satisfy the filter.

Source code in core\database_interface.py
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
def list_actions(
    self,
    *,
    default: bool | None = None,
) -> List[Dict[str, Any]]:
    """
    Return stored actions optionally filtered by the ``default`` flag.

    Args:
        default: When provided, only return actions whose ``default`` field
            matches the boolean value.

    Returns:
        List of action dictionaries stored on disk that satisfy the filter.
    """
    actions = registry_instance.list_all_actions_as_json()

    if default is not None:
        actions = [action for action in actions if action.get("default") == default]

    return actions

get_action(name)

Fetch a stored action by case-insensitive name match.

Parameters:

Name Type Description Default
name str

The human-readable name used to identify the action.

required

Returns:

Type Description
Optional[Dict[str, Any]]

The action dictionary when found, otherwise None.

Source code in core\database_interface.py
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
def get_action(self, name: str) -> Optional[Dict[str, Any]]:
    """
    Fetch a stored action by case-insensitive name match.

    Args:
        name: The human-readable name used to identify the action.

    Returns:
        The action dictionary when found, otherwise ``None``.
    """
    needle = name.lower()
    action = registry_instance.find_action_by_name(action_name=name)

    # if action is None:
    #     for action in self._load_actions_from_disk():
    #         if action.get("name", "").lower() == needle:
    #             return action
    return action

delete_action(name)

Remove an action definition from disk and its Chroma index.

Parameters:

Name Type Description Default
name str

Name of the action to delete.

required
Source code in core\database_interface.py
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
def delete_action(self, name: str) -> None:
    """
    Remove an action definition from disk and its Chroma index.

    Args:
        name: Name of the action to delete.
    """
    for path in self.actions_dir.glob("*.json"):
        try:
            payload = json.loads(path.read_text(encoding="utf-8"))
        except Exception:
            continue
        if payload.get("name") == name:
            path.unlink(missing_ok=True)
            break
    self.chroma_actions.delete(ids=[name], ignore_missing=True)

search_actions(query, top_k=7)

Search stored actions using vector similarity on their names and description.

Parameters:

Name Type Description Default
query str

Freeform text used to locate similar action names and description.

required
top_k int

Maximum number of action identifiers to return.

7

Returns:

Type Description
List[str]

List of action names ranked by similarity to query.

Source code in core\database_interface.py
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
def search_actions(self, query: str, top_k: int = 7) -> List[str]:
    """
    Search stored actions using vector similarity on their names and description.

    Args:
        query: Freeform text used to locate similar action names and description.
        top_k: Maximum number of action identifiers to return.

    Returns:
        List of action names ranked by similarity to ``query``.
    """
    result = self.chroma_actions.query(
        query_texts=[query],
        n_results=top_k,
    )
    return result.get("ids", [[]])[0] if result else []

sync_actions_to_chroma(paths_to_scan=None)

Build the Chroma action collection from JSON files on disk.

Returns:

Type Description
int

Number of action definitions indexed in Chroma.

Source code in core\database_interface.py
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
def sync_actions_to_chroma(self, paths_to_scan: List[str] = None) -> int:
    """
    Build the Chroma action collection from JSON files on disk.

    Returns:
        Number of action definitions indexed in Chroma.
    """        
    load_actions_from_directories(paths_to_scan=paths_to_scan)

    actions: List[Dict[str, Any]] = registry_instance.list_all_actions_as_json()

    try:
        existing = self.chroma_actions.get()
        ids = existing.get("ids", []) if existing else []
        if ids:
            self.chroma_actions.delete(ids=list(ids))
    except Exception:
        pass

    ids: List[str] = []
    documents: List[str] = []
    for action in actions:
        name = action.get("name")
        if not name:
            continue
        ids.append(name)
        documents.append(name)

    if not ids:
        return 0

    self.chroma_actions.add(ids=ids, documents=documents)
    return len(ids)

set_agent_info(info, key='singleton')

Persist arbitrary agent configuration under the provided key.

Parameters:

Name Type Description Default
info Dict[str, Any]

Mapping of configuration fields to store.

required
key str

Logical namespace under which the configuration is saved.

'singleton'
Source code in core\database_interface.py
466
467
468
469
470
471
472
473
474
475
476
477
478
479
def set_agent_info(self, info: Dict[str, Any], key: str = "singleton") -> None:
    """
    Persist arbitrary agent configuration under the provided key.

    Args:
        info: Mapping of configuration fields to store.
        key: Logical namespace under which the configuration is saved.
    """        
    try:
        existing = json.loads(self.agent_info_path.read_text(encoding="utf-8"))
    except Exception:
        existing = {}
    existing[key] = {**existing.get(key, {}), **info}
    self.agent_info_path.write_text(json.dumps(existing, indent=2), encoding="utf-8")

get_agent_info(key='singleton')

Load persisted agent configuration for the given key.

Parameters:

Name Type Description Default
key str

Namespace key used when persisting the configuration.

'singleton'

Returns:

Type Description
Optional[Dict[str, Any]]

A configuration dictionary when present, otherwise None.

Source code in core\database_interface.py
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
def get_agent_info(self, key: str = "singleton") -> Optional[Dict[str, Any]]:
    """
    Load persisted agent configuration for the given key.

    Args:
        key: Namespace key used when persisting the configuration.

    Returns:
        A configuration dictionary when present, otherwise ``None``.
    """        
    try:
        info = json.loads(self.agent_info_path.read_text(encoding="utf-8"))
    except Exception:
        return None
    return info.get(key)

sync_task_documents_to_chroma()

Rebuild the Chroma collection from task document text files.

Returns:

Type Description
int

Number of task documents indexed in Chroma after the sync.

Source code in core\database_interface.py
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
def sync_task_documents_to_chroma(self) -> int:
    """
    Rebuild the Chroma collection from task document text files.

    Returns:
        Number of task documents indexed in Chroma after the sync.
    """        
    docs = self._load_task_documents_from_disk()
    try:
        existing = self.chroma_taskdocs_coll.get()
        ids = existing.get("ids", []) if existing else []
        if ids:
            self.chroma_taskdocs_coll.delete(ids=list(ids))
    except Exception:
        pass

    if not docs:
        return 0

    ids: List[str] = []
    documents: List[str] = []
    metadatas: List[Dict[str, Any]] = []
    for doc in docs:
        ids.append(doc["task_id"])
        documents.append(f"{doc['name']}\n\n{doc['description']}")
        metadatas.append({"name": doc["name"]})

    self.chroma_taskdocs_coll.add(ids=ids, documents=documents, metadatas=metadatas)
    return len(ids)

retrieve_similar_task_documents(query, top_k=5)

Return task documents ranked by similarity to the query text.

Parameters:

Name Type Description Default
query str

Text prompt used for semantic similarity search.

required
top_k int

Maximum number of documents to return.

5

Returns:

Type Description
List[Dict[str, Any]]

Ordered list of task document dictionaries in descending similarity

List[Dict[str, Any]]

rank.

Source code in core\database_interface.py
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
def retrieve_similar_task_documents(self, query: str, top_k: int = 5) -> List[Dict[str, Any]]:
    """
    Return task documents ranked by similarity to the query text.

    Args:
        query: Text prompt used for semantic similarity search.
        top_k: Maximum number of documents to return.

    Returns:
        Ordered list of task document dictionaries in descending similarity
        rank.
    """        
    if not query or not query.strip():
        logger.debug("[NO QUERY FOUND]")
        return []

    result = self.chroma_taskdocs_coll.query(
        query_texts=[query],
        n_results=top_k,
    )

    ids = result.get("ids", [[]])[0] if result else []
    if not ids:
        return []

    docs_by_id = {doc["task_id"]: doc for doc in self._load_task_documents_from_disk()}
    docs: List[Dict[str, Any]] = []
    for doc_id in ids:
        doc = docs_by_id.get(doc_id)
        if doc:
            docs.append(doc)

    order = {tid: i for i, tid in enumerate(ids)}
    docs.sort(key=lambda d: order.get(d.get("task_id", ""), 10**9))
    return docs

get_task_document_texts(query, top_k=3)

Fetch raw task document texts based on similarity search results.

Parameters:

Name Type Description Default
query str

Freeform text query used to find related documents.

required
top_k int

Maximum number of raw documents to return.

3

Returns:

Type Description
List[str]

List of raw task document strings ordered by similarity.

Source code in core\database_interface.py
609
610
611
612
613
614
615
616
617
618
619
620
621
def get_task_document_texts(self, query: str, top_k: int = 3) -> List[str]:
    """
    Fetch raw task document texts based on similarity search results.

    Args:
        query: Freeform text query used to find related documents.
        top_k: Maximum number of raw documents to return.

    Returns:
        List of raw task document strings ordered by similarity.
    """        
    matches = self.retrieve_similar_task_documents(query, top_k=top_k)
    return [m.get("raw_text", "") for m in matches]

find_current_task_steps()

List steps across all tasks that are marked as current. Ideally, it should only return one step. However, there might be case where multple steps are assigned with current due to LLM error.

Returns:

Type Description
List[Dict[str, Any]]

A list of dictionaries pairing task_id with the active step

List[Dict[str, Any]]

metadata.

Source code in core\database_interface.py
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
def find_current_task_steps(self) -> List[Dict[str, Any]]:
    """
    List steps across all tasks that are marked as ``current``.
    Ideally, it should only return one step. However, there might be case 
    where multple steps are assigned with ``current`` due to LLM error.

    Returns:
        A list of dictionaries pairing ``task_id`` with the active step
        metadata.
    """        
    results: List[Dict[str, Any]] = []
    for entry in self._iter_task_logs():
        task_id = entry.get("task_id")
        for step in entry.get("steps", []):
            if step.get("status") == "current":
                results.append({"task_id": task_id, "step": step})
    return results

update_step_status(task_id, action_id, status, failure_message=None)

Update the status of a task step and persist the change.

Parameters:

Name Type Description Default
task_id str

Identifier for the task owning the step.

required
action_id str

The step's action identifier used to locate it.

required
status str

New status string to assign to the step.

required
failure_message Optional[str]

Optional failure detail to attach when updating.

None
Source code in core\database_interface.py
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
def update_step_status(
    self,
    task_id: str,
    action_id: str,
    status: str,
    failure_message: Optional[str] = None,
) -> None:
    """
    Update the status of a task step and persist the change.

    Args:
        task_id: Identifier for the task owning the step.
        action_id: The step's action identifier used to locate it.
        status: New status string to assign to the step.
        failure_message: Optional failure detail to attach when updating.
    """        
    entries = self._load_log_entries()
    updated = False
    for entry in entries:
        if entry.get("entry_type") != "task_log" or entry.get("task_id") != task_id:
            continue
        for step in entry.get("steps", []):
            if step.get("action_id") == action_id:
                step["status"] = status
                if failure_message is not None:
                    step["failure_message"] = failure_message
                updated = True
                break
        if updated:
            entry["updated_at"] = datetime.datetime.utcnow().isoformat()
            break
    if updated:
        self._write_log_entries(entries)