143 lines
5.6 KiB
Python
143 lines
5.6 KiB
Python
from collections.abc import Callable, Coroutine, Awaitable
|
|
from typing import List, Optional
|
|
from datetime import datetime
|
|
|
|
from sklearn.metrics.pairwise import cosine_similarity
|
|
from living_agents import LLMAgent, Memory, Character, PromptManager
|
|
from llm_connector import LLMMessage
|
|
|
|
|
|
class MemoryStream:
|
|
"""Stanford's memory architecture with observation, reflection, and planning"""
|
|
|
|
def __init__(self, llm_agent: LLMAgent):
|
|
self.memories: List[Memory] = []
|
|
self.llm = llm_agent
|
|
self.importance_threshold = 150 # Reflection trigger threshold
|
|
self.recent_importance_sum = 0
|
|
|
|
async def add_memory(self, memory: Memory):
|
|
memory.embedding = await self.llm.get_embedding(memory.description)
|
|
self.memories.append(memory)
|
|
|
|
async def add_observation(self, description: str, scoring_func: Optional[Callable[[Memory], Awaitable[int]]] = None) -> Memory:
|
|
"""Add a new observation with importance scoring"""
|
|
|
|
memory = Memory(
|
|
description=description,
|
|
creation_time=datetime.now(),
|
|
last_accessed=datetime.now(),
|
|
importance_score=5,
|
|
memory_type="observation"
|
|
)
|
|
|
|
if scoring_func:
|
|
memory.importance_score = await scoring_func(memory)
|
|
|
|
# Get embedding for retrieval
|
|
memory.embedding = await self.llm.get_embedding(description)
|
|
|
|
self.memories.append(memory)
|
|
|
|
# Track for reflection trigger
|
|
self.recent_importance_sum += memory.importance_score
|
|
|
|
# Trigger reflection if threshold exceeded
|
|
if self.recent_importance_sum >= self.importance_threshold:
|
|
print("Reflection triggered.")
|
|
await self._generate_reflections()
|
|
self.recent_importance_sum = 0
|
|
|
|
return memory
|
|
|
|
async def _generate_reflections(self):
|
|
"""Generate high-level reflections from recent memories"""
|
|
# Get recent high-importance memories
|
|
recent_memories = [m for m in self.memories[-20:] if m.memory_type == "observation"]
|
|
|
|
if len(recent_memories) < 3:
|
|
return
|
|
|
|
# Generate questions for reflection
|
|
memory_descriptions = "\n".join([f"{i + 1}. {m.description}" for i, m in enumerate(recent_memories)])
|
|
|
|
prompt, schema = PromptManager.get_prompt_with_schema('generate_reflection', {'{{recent_observations}}': memory_descriptions})
|
|
|
|
messages: List[LLMMessage] = [{'role': 'user', 'content': prompt}]
|
|
|
|
response = await self.llm.client.get_structured_response(messages, schema)
|
|
|
|
insight_prompt = ''
|
|
|
|
insights_response = await self.llm.chat([{"role": "user", "content": insight_prompt}])
|
|
|
|
# Parse insights and create reflection memories
|
|
for line in insights_response.split('\n'):
|
|
if '(' in line and ')' in line:
|
|
insight = line.split('(')[0].strip()
|
|
if insight and len(insight) > 10:
|
|
# Create reflection memory
|
|
reflection = Memory(
|
|
description=f"Reflection: {insight}",
|
|
creation_time=datetime.now(),
|
|
last_accessed=datetime.now(),
|
|
importance_score=7, # Reflections are generally important
|
|
memory_type="reflection",
|
|
embedding=await self.llm.get_embedding(insight)
|
|
)
|
|
self.memories.append(reflection)
|
|
|
|
async def get_related_memories_for_scoring(self, memory_text: str, exclude_self=None, k=5) -> List:
|
|
"""Get memories related to the one being scored"""
|
|
# Get embedding for the memory being scored
|
|
memory_embedding = await self.llm.get_embedding(memory_text)
|
|
|
|
# Calculate similarity to other memories
|
|
similarities = []
|
|
for mem in self.memories:
|
|
if mem == exclude_self:
|
|
continue
|
|
|
|
if mem.embedding:
|
|
from sklearn.metrics.pairwise import cosine_similarity
|
|
similarity = cosine_similarity([memory_embedding], [mem.embedding])[0][0]
|
|
similarities.append((similarity, mem))
|
|
|
|
# Return top K most similar memories
|
|
similarities.sort(reverse=True, key=lambda x: x[0])
|
|
return [mem for _, mem in similarities[:k]]
|
|
|
|
async def retrieve_related_memories(self, query: str, k: int = 10) -> List[Memory]:
|
|
"""Retrieve relevant memories using recency, importance, relevance"""
|
|
if not self.memories:
|
|
return []
|
|
|
|
query_embedding = await self.llm.get_embedding(query)
|
|
current_time = datetime.now()
|
|
scores = []
|
|
|
|
for i, memory in enumerate(self.memories):
|
|
# Update last accessed
|
|
memory.last_accessed = current_time
|
|
|
|
# Calculate recency (exponential decay)
|
|
hours_since_accessed = (current_time - memory.last_accessed).total_seconds() / 3600
|
|
recency = 0.995 ** hours_since_accessed
|
|
|
|
# Importance (already scored 1-10)
|
|
importance = memory.importance_score / 10.0
|
|
|
|
# Relevance (cosine similarity)
|
|
if memory.embedding and query_embedding:
|
|
relevance = cosine_similarity([query_embedding], [memory.embedding])[0][0]
|
|
else:
|
|
relevance = 0.0
|
|
|
|
# Combined score (equal weighting as in Stanford paper)
|
|
score = recency + importance + relevance
|
|
scores.append((score, i, memory))
|
|
|
|
# Sort by score and return top k
|
|
scores.sort(reverse=True, key=lambda x: x[0])
|
|
return [memory for _, _, memory in scores[:k]]
|