Skip to content

core / event_stream / event_stream_manager

core.event_stream.event_stream_manager

core.event_stream.event_stream_manager.

Event stream manager that manages, stores, return concurrent event streams running under several active tasks.

EventStreamManager

Source code in core\event_stream\event_stream_manager.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
class EventStreamManager:
    def __init__(self, llm: LLMInterface) -> None:
        # active event streams, keyed by session_id (string)
        self.event_stream: EventStream = EventStream(llm=llm, temp_dir=None)
        self.llm = llm

    # ───────────────────────────── lifecycle ─────────────────────────────

    def get_stream(self) -> EventStream:
        """Return the event stream for this session, or None if missing."""
        return self.event_stream

    def clear_all(self) -> None:
        """Remove all event streams."""
        self.event_stream.clear()

    # ───────────────────────────── utilities ─────────────────────────────

    def log(
        self,
        kind: str,
        message: str,
        severity: str = "INFO",
        *,
        display_message: str | None = None,
        action_name: str | None = None,
    ) -> int:
        """
        Log directly to a session's event stream, creating it on demand.

        The manager records debug breadcrumbs around stream creation to aid in
        tracing concurrent tasks. Returned indices match those produced by
        :meth:`core.event_stream.event_stream.EventStream.log` and can be used
        to correlate updates.

        Args:
            session_id: Target stream identifier; a new stream is created when
                none exists.
            kind: Event family such as ``"action_start"`` or ``"warn"``.
            message: Main event text.
            severity: Importance level, defaulting to ``"INFO"``.
            display_message: Optional trimmed message for UI surfaces.
            action_name: Optional action label for file-based externalization.

        Returns:
            Index of the logged event within the target stream's tail.
        """
        logger.debug(f"Process Started - Logging event to stream: [{severity}] {kind} - {message}")
        stream = self.get_stream()
        return stream.log(
            kind,
            message,
            severity,
            display_message=display_message,
            action_name=action_name,
        )

    def snapshot(self, include_summary: bool = True) -> str:
        """Return a prompt snapshot of a specific session, or '(no events)' if not found."""
        stream = self.get_stream()
        if not stream:
            return "(no events)"
        return stream.to_prompt_snapshot(include_summary=include_summary)

get_stream()

Return the event stream for this session, or None if missing.

Source code in core\event_stream\event_stream_manager.py
26
27
28
def get_stream(self) -> EventStream:
    """Return the event stream for this session, or None if missing."""
    return self.event_stream

clear_all()

Remove all event streams.

Source code in core\event_stream\event_stream_manager.py
30
31
32
def clear_all(self) -> None:
    """Remove all event streams."""
    self.event_stream.clear()

log(kind, message, severity='INFO', *, display_message=None, action_name=None)

Log directly to a session's event stream, creating it on demand.

The manager records debug breadcrumbs around stream creation to aid in tracing concurrent tasks. Returned indices match those produced by :meth:core.event_stream.event_stream.EventStream.log and can be used to correlate updates.

Parameters:

Name Type Description Default
session_id

Target stream identifier; a new stream is created when none exists.

required
kind str

Event family such as "action_start" or "warn".

required
message str

Main event text.

required
severity str

Importance level, defaulting to "INFO".

'INFO'
display_message str | None

Optional trimmed message for UI surfaces.

None
action_name str | None

Optional action label for file-based externalization.

None

Returns:

Type Description
int

Index of the logged event within the target stream's tail.

Source code in core\event_stream\event_stream_manager.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
def log(
    self,
    kind: str,
    message: str,
    severity: str = "INFO",
    *,
    display_message: str | None = None,
    action_name: str | None = None,
) -> int:
    """
    Log directly to a session's event stream, creating it on demand.

    The manager records debug breadcrumbs around stream creation to aid in
    tracing concurrent tasks. Returned indices match those produced by
    :meth:`core.event_stream.event_stream.EventStream.log` and can be used
    to correlate updates.

    Args:
        session_id: Target stream identifier; a new stream is created when
            none exists.
        kind: Event family such as ``"action_start"`` or ``"warn"``.
        message: Main event text.
        severity: Importance level, defaulting to ``"INFO"``.
        display_message: Optional trimmed message for UI surfaces.
        action_name: Optional action label for file-based externalization.

    Returns:
        Index of the logged event within the target stream's tail.
    """
    logger.debug(f"Process Started - Logging event to stream: [{severity}] {kind} - {message}")
    stream = self.get_stream()
    return stream.log(
        kind,
        message,
        severity,
        display_message=display_message,
        action_name=action_name,
    )

snapshot(include_summary=True)

Return a prompt snapshot of a specific session, or '(no events)' if not found.

Source code in core\event_stream\event_stream_manager.py
75
76
77
78
79
80
def snapshot(self, include_summary: bool = True) -> str:
    """Return a prompt snapshot of a specific session, or '(no events)' if not found."""
    stream = self.get_stream()
    if not stream:
        return "(no events)"
    return stream.to_prompt_snapshot(include_summary=include_summary)