from collections.abc import Callable, Coroutine, Awaitable from typing import List, Optional from datetime import datetime from sklearn.metrics.pairwise import cosine_similarity from living_agents import LLMAgent, Memory, Character, PromptManager from llm_connector import LLMMessage class MemoryStream: """Stanford's memory architecture with observation, reflection, and planning""" def __init__(self, llm_agent: LLMAgent): self.memories: List[Memory] = [] self.llm = llm_agent self.importance_threshold = 150 # Reflection trigger threshold self.recent_importance_sum = 0 async def add_memory(self, memory: Memory): memory.embedding = await self.llm.get_embedding(memory.description) self.memories.append(memory) async def add_observation(self, description: str, scoring_func: Optional[Callable[[Memory], Awaitable[int]]] = None) -> Memory: """Add a new observation with importance scoring""" memory = Memory( description=description, creation_time=datetime.now(), last_accessed=datetime.now(), importance_score=5, memory_type="observation" ) if scoring_func: memory.importance_score = await scoring_func(memory) # Get embedding for retrieval memory.embedding = await self.llm.get_embedding(description) self.memories.append(memory) # Track for reflection trigger self.recent_importance_sum += memory.importance_score print(f"Recent Importance Sum: {self.recent_importance_sum}") # Trigger reflection if threshold exceeded if self.recent_importance_sum >= self.importance_threshold: print("Reflection triggered.") await self._generate_reflections() self.recent_importance_sum = 0 return memory async def _generate_reflections(self): """Generate high-level reflections from recent memories""" # Get recent high-importance memories recent_memories = [m for m in self.memories[-20:] if m.memory_type == "observation"] if len(recent_memories) < 3: return # Generate questions for reflection memory_descriptions = "\n".join([f"{i + 1}. {m.description}" for i, m in enumerate(recent_memories)]) prompt, schema = PromptManager.get_prompt_with_schema('generate_reflection', {'{{recent_observations}}': memory_descriptions}) messages: List[LLMMessage] = [{'role': 'user', 'content': prompt}] response = await self.llm.client.get_structured_response(messages, schema) insight_prompt = '' insights_response = await self.llm.chat([{"role": "user", "content": insight_prompt}]) # Parse insights and create reflection memories for line in insights_response.split('\n'): if '(' in line and ')' in line: insight = line.split('(')[0].strip() if insight and len(insight) > 10: # Create reflection memory reflection = Memory( description=f"Reflection: {insight}", creation_time=datetime.now(), last_accessed=datetime.now(), importance_score=7, # Reflections are generally important memory_type="reflection", embedding=await self.llm.get_embedding(insight) ) self.memories.append(reflection) async def retrieve_related_memories(self, query: str, k: int = 10) -> List[Memory]: """Retrieve relevant memories using recency, importance, relevance""" if not self.memories: return [] query_embedding = await self.llm.get_embedding(query) current_time = datetime.now() scores = [] for i, memory in enumerate(self.memories): # Update last accessed memory.last_accessed = current_time # Calculate recency (exponential decay) hours_since_accessed = (current_time - memory.last_accessed).total_seconds() / 3600 recency = 0.995 ** hours_since_accessed # Importance (already scored 1-10) importance = memory.importance_score / 10.0 # Relevance (cosine similarity) if memory.embedding and query_embedding: relevance = cosine_similarity([query_embedding], [memory.embedding])[0][0] else: relevance = 0.0 # Combined score (equal weighting as in Stanford paper) score = recency + importance + relevance scores.append((score, i, memory)) # Sort by score and return top k scores.sort(reverse=True, key=lambda x: x[0]) return [memory for _, _, memory in scores[:k]]