605 lines
24 KiB
Python
605 lines
24 KiB
Python
import json
|
|
import os
|
|
import math
|
|
import time
|
|
from datetime import datetime, timedelta
|
|
from typing import Dict, List, Any, Optional, Tuple
|
|
from dataclasses import dataclass, field
|
|
from openai import OpenAI
|
|
import numpy as np
|
|
from sklearn.metrics.pairwise import cosine_similarity
|
|
|
|
# Initialize OpenAI client
|
|
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
|
|
|
|
|
|
@dataclass
|
|
class Memory:
|
|
"""A single memory object with Stanford's architecture"""
|
|
description: str
|
|
creation_time: datetime
|
|
last_accessed: datetime
|
|
importance_score: int # 1-10 scale
|
|
embedding: Optional[List[float]] = None
|
|
memory_type: str = "observation" # observation, reflection, plan
|
|
related_memories: List[int] = field(default_factory=list) # IDs of supporting memories
|
|
|
|
def __post_init__(self):
|
|
if self.last_accessed is None:
|
|
self.last_accessed = self.creation_time
|
|
|
|
|
|
class LLMAgent:
|
|
def __init__(self, model: str = "gpt-3.5-turbo", temperature: float = 0.8):
|
|
self.model = model
|
|
self.temperature = temperature
|
|
|
|
def chat(self, messages: List[Dict[str, str]], max_tokens: int = 200) -> str:
|
|
try:
|
|
response = client.chat.completions.create(
|
|
model=self.model,
|
|
messages=messages,
|
|
temperature=self.temperature,
|
|
max_tokens=max_tokens
|
|
)
|
|
return response.choices[0].message.content.strip()
|
|
except Exception as e:
|
|
return f"[LLM Error: {str(e)}]"
|
|
|
|
def get_embedding(self, text: str) -> List[float]:
|
|
"""Get embedding for memory relevance scoring"""
|
|
try:
|
|
response = client.embeddings.create(
|
|
model="text-embedding-ada-002",
|
|
input=text
|
|
)
|
|
return response.data[0].embedding
|
|
except Exception as e:
|
|
print(f"Embedding error: {e}")
|
|
return [0.0] * 1536 # Default embedding size
|
|
|
|
|
|
@dataclass
|
|
class Character:
|
|
name: str
|
|
age: int
|
|
personality: str
|
|
occupation: str
|
|
location: str
|
|
relationships: Dict[str, str] = field(default_factory=dict)
|
|
goals: List[str] = field(default_factory=list)
|
|
|
|
|
|
class MemoryStream:
|
|
"""Stanford's memory architecture with observation, reflection, and planning"""
|
|
|
|
def __init__(self, llm_agent: LLMAgent):
|
|
self.memories: List[Memory] = []
|
|
self.memory_counter = 0
|
|
self.llm = llm_agent
|
|
self.importance_threshold = 150 # Reflection trigger threshold
|
|
self.recent_importance_sum = 0
|
|
|
|
def add_observation(self, description: str) -> int:
|
|
"""Add a new observation with importance scoring"""
|
|
importance = self._score_importance(description)
|
|
|
|
memory = Memory(
|
|
description=description,
|
|
creation_time=datetime.now(),
|
|
last_accessed=datetime.now(),
|
|
importance_score=importance,
|
|
memory_type="observation"
|
|
)
|
|
|
|
# Get embedding for retrieval
|
|
memory.embedding = self.llm.get_embedding(description)
|
|
|
|
memory_id = self.memory_counter
|
|
self.memories.append(memory)
|
|
self.memory_counter += 1
|
|
|
|
# Track for reflection trigger
|
|
self.recent_importance_sum += importance
|
|
|
|
# Trigger reflection if threshold exceeded
|
|
if self.recent_importance_sum >= self.importance_threshold:
|
|
self._generate_reflections()
|
|
self.recent_importance_sum = 0
|
|
|
|
return memory_id
|
|
|
|
def _score_importance(self, description: str) -> int:
|
|
"""Use LLM to score memory importance (Stanford approach)"""
|
|
prompt = f"""On the scale of 1 to 10, where 1 is purely mundane (e.g., brushing teeth, making bed) and 10 is extremely poignant (e.g., a break up, college acceptance), rate the likely poignancy of the following piece of memory.
|
|
|
|
Memory: {description}
|
|
Rating: """
|
|
|
|
try:
|
|
response = self.llm.chat([{"role": "user", "content": prompt}], max_tokens=5)
|
|
# Extract number from response
|
|
score = int(''.join(filter(str.isdigit, response))[:1] or "5")
|
|
return max(1, min(10, score))
|
|
except:
|
|
return 5 # Default moderate importance
|
|
|
|
def _generate_reflections(self):
|
|
"""Generate high-level reflections from recent memories"""
|
|
# Get recent high-importance memories
|
|
recent_memories = [m for m in self.memories[-20:] if m.memory_type == "observation"]
|
|
|
|
if len(recent_memories) < 3:
|
|
return
|
|
|
|
# Generate questions for reflection
|
|
memory_descriptions = "\n".join([f"{i+1}. {m.description}" for i, m in enumerate(recent_memories)])
|
|
|
|
questions_prompt = f"""Given only the information above, what are 3 most salient high-level questions we can answer about the subjects in the statements?
|
|
|
|
{memory_descriptions}
|
|
|
|
Questions:"""
|
|
|
|
try:
|
|
questions_response = self.llm.chat([{"role": "user", "content": questions_prompt}])
|
|
|
|
# For each question, generate insights
|
|
insight_prompt = f"""Statements:
|
|
{memory_descriptions}
|
|
|
|
What 5 high-level insights can you infer from the above statements?
|
|
Format: insight (because of 1, 3, 5)"""
|
|
|
|
insights_response = self.llm.chat([{"role": "user", "content": insight_prompt}])
|
|
|
|
# Parse insights and create reflection memories
|
|
for line in insights_response.split('\n'):
|
|
if '(' in line and ')' in line:
|
|
insight = line.split('(')[0].strip()
|
|
if insight and len(insight) > 10:
|
|
# Create reflection memory
|
|
reflection = Memory(
|
|
description=f"Reflection: {insight}",
|
|
creation_time=datetime.now(),
|
|
last_accessed=datetime.now(),
|
|
importance_score=7, # Reflections are generally important
|
|
memory_type="reflection",
|
|
embedding=self.llm.get_embedding(insight)
|
|
)
|
|
self.memories.append(reflection)
|
|
self.memory_counter += 1
|
|
|
|
except Exception as e:
|
|
print(f"Reflection generation error: {e}")
|
|
|
|
def retrieve_memories(self, query: str, k: int = 10) -> List[Memory]:
|
|
"""Retrieve relevant memories using recency, importance, relevance"""
|
|
if not self.memories:
|
|
return []
|
|
|
|
query_embedding = self.llm.get_embedding(query)
|
|
current_time = datetime.now()
|
|
scores = []
|
|
|
|
for i, memory in enumerate(self.memories):
|
|
# Update last accessed
|
|
memory.last_accessed = current_time
|
|
|
|
# Calculate recency (exponential decay)
|
|
hours_since_accessed = (current_time - memory.last_accessed).total_seconds() / 3600
|
|
recency = 0.995 ** hours_since_accessed
|
|
|
|
# Importance (already scored 1-10)
|
|
importance = memory.importance_score / 10.0
|
|
|
|
# Relevance (cosine similarity)
|
|
if memory.embedding and query_embedding:
|
|
relevance = cosine_similarity([query_embedding], [memory.embedding])[0][0]
|
|
else:
|
|
relevance = 0.0
|
|
|
|
# Combined score (equal weighting as in Stanford paper)
|
|
score = recency + importance + relevance
|
|
scores.append((score, i, memory))
|
|
|
|
# Sort by score and return top k
|
|
scores.sort(reverse=True, key=lambda x: x[0])
|
|
return [memory for _, _, memory in scores[:k]]
|
|
|
|
|
|
class CharacterAgent:
|
|
"""Enhanced agent with Stanford's memory architecture"""
|
|
|
|
def __init__(self, character: Character, llm: LLMAgent):
|
|
self.character = character
|
|
self.llm = llm
|
|
self.memory_stream = MemoryStream(llm)
|
|
self.current_plan: List[str] = []
|
|
|
|
# Initialize with character background
|
|
self._initialize_memories()
|
|
|
|
def _initialize_memories(self):
|
|
"""Initialize agent with background memories"""
|
|
background_facts = [
|
|
f"My name is {self.character.name} and I am {self.character.age} years old",
|
|
f"My personality: {self.character.personality}",
|
|
f"My occupation: {self.character.occupation}",
|
|
f"I live in {self.character.location}"
|
|
]
|
|
|
|
for fact in background_facts:
|
|
self.memory_stream.add_observation(fact)
|
|
|
|
for person, relationship in self.character.relationships.items():
|
|
self.memory_stream.add_observation(f"My relationship with {person}: {relationship}")
|
|
|
|
def perceive(self, observation: str) -> None:
|
|
"""Add new observation to memory stream"""
|
|
self.memory_stream.add_observation(observation)
|
|
|
|
def plan_day(self) -> List[str]:
|
|
"""Generate high-level daily plan"""
|
|
# Retrieve relevant memories about goals, habits, schedule
|
|
relevant_memories = self.memory_stream.retrieve_memories(
|
|
f"{self.character.name} daily routine goals schedule", k=5
|
|
)
|
|
|
|
memory_context = "\n".join([m.description for m in relevant_memories])
|
|
|
|
plan_prompt = f"""You are {self.character.name}.
|
|
Background: {self.character.personality}
|
|
Occupation: {self.character.occupation}
|
|
|
|
Relevant memories:
|
|
{memory_context}
|
|
|
|
Plan your day in broad strokes (5-8 activities with times):
|
|
1)"""
|
|
|
|
try:
|
|
response = self.llm.chat([{"role": "user", "content": plan_prompt}], max_tokens=300)
|
|
plan_steps = [f"1){response}"] if response else ["1) Go about my daily routine"]
|
|
|
|
# Add plan to memory
|
|
plan_description = f"Daily plan: {'; '.join(plan_steps)}"
|
|
self.memory_stream.add_observation(plan_description)
|
|
|
|
return plan_steps
|
|
except:
|
|
return ["1) Go about my daily routine"]
|
|
|
|
def react_to_situation(self, situation: str) -> str:
|
|
"""Generate reaction based on memory and character"""
|
|
# Retrieve relevant memories
|
|
relevant_memories = self.memory_stream.retrieve_memories(situation, k=8)
|
|
memory_context = "\n".join([f"- {m.description}" for m in relevant_memories])
|
|
|
|
reaction_prompt = f"""You are {self.character.name}.
|
|
Age: {self.character.age}
|
|
Personality: {self.character.personality}
|
|
Current location: {self.character.location}
|
|
|
|
Relevant memories from your past:
|
|
{memory_context}
|
|
|
|
Current situation: {situation}
|
|
|
|
How do you react? Stay completely in character and be specific about what you would do or say."""
|
|
|
|
try:
|
|
response = self.llm.chat([{"role": "user", "content": reaction_prompt}])
|
|
|
|
# Add reaction to memory
|
|
self.memory_stream.add_observation(f"I reacted to '{situation}' by: {response}")
|
|
|
|
return response
|
|
except:
|
|
return "I'm not sure how to respond to that."
|
|
|
|
def get_summary(self) -> str:
|
|
"""Generate current summary based on memories and reflections"""
|
|
reflections = [m for m in self.memory_stream.memories if m.memory_type == "reflection"]
|
|
recent_observations = self.memory_stream.memories[-10:]
|
|
|
|
summary_memories = reflections[-3:] + recent_observations[-5:]
|
|
memory_context = "\n".join([m.description for m in summary_memories])
|
|
|
|
summary_prompt = f"""Based on the following memories and reflections, provide a brief summary of who {self.character.name} is and what they care about:
|
|
|
|
{memory_context}
|
|
|
|
Summary:"""
|
|
|
|
try:
|
|
return self.llm.chat([{"role": "user", "content": summary_prompt}], max_tokens=150)
|
|
except:
|
|
return f"{self.character.name} is a {self.character.age}-year-old {self.character.occupation}."
|
|
|
|
|
|
class SceneManager:
|
|
"""Enhanced scene manager with better context filtering"""
|
|
|
|
def __init__(self, main_llm: LLMAgent):
|
|
self.main_llm = main_llm
|
|
self.characters: Dict[str, Character] = {}
|
|
self.agents: Dict[str, CharacterAgent] = {}
|
|
self.scene_state = {
|
|
"location": "cozy coffee shop",
|
|
"time": "afternoon",
|
|
"atmosphere": "quiet and peaceful",
|
|
"active_conversations": [],
|
|
"events": []
|
|
}
|
|
self.global_time = datetime.now()
|
|
|
|
def add_character(self, character: Character):
|
|
self.characters[character.name] = character
|
|
agent = CharacterAgent(character, LLMAgent("gpt-3.5-turbo", temperature=0.9))
|
|
self.agents[character.name] = agent
|
|
print(f"✓ Added {character.name} to the scene")
|
|
|
|
def advance_time(self, hours: int = 1):
|
|
"""Advance scene time and trigger agent planning"""
|
|
self.global_time += timedelta(hours=hours)
|
|
self.scene_state["time"] = self.global_time.strftime("%I:%M %p")
|
|
|
|
# Each agent plans their next actions
|
|
for name, agent in self.agents.items():
|
|
agent.perceive(f"Time is now {self.scene_state['time']}")
|
|
|
|
def character_interaction(self, char1_name: str, char2_name: str, context: str) -> Dict[str, str]:
|
|
"""Handle interaction between two characters"""
|
|
if char1_name not in self.agents or char2_name not in self.agents:
|
|
return {"error": "Character not found"}
|
|
|
|
char1_agent = self.agents[char1_name]
|
|
char2_agent = self.agents[char2_name]
|
|
|
|
# Both characters observe the interaction context
|
|
char1_agent.perceive(f"Interacting with {char2_name}: {context}")
|
|
char2_agent.perceive(f"Interacting with {char1_name}: {context}")
|
|
|
|
# Generate responses
|
|
char1_response = char1_agent.react_to_situation(f"You are talking with {char2_name}. Context: {context}")
|
|
char2_response = char2_agent.react_to_situation(f"{char1_name} said: '{char1_response}'")
|
|
|
|
# Both remember the conversation
|
|
char1_agent.perceive(f"Conversation with {char2_name}: I said '{char1_response}', they replied '{char2_response}'")
|
|
char2_agent.perceive(f"Conversation with {char1_name}: They said '{char1_response}', I replied '{char2_response}'")
|
|
|
|
return {
|
|
char1_name: char1_response,
|
|
char2_name: char2_response
|
|
}
|
|
|
|
|
|
class EnhancedRoleplaySystem:
|
|
def __init__(self):
|
|
self.scene_manager = SceneManager(LLMAgent("gpt-4o-mini", temperature=0.7))
|
|
self.setup_characters()
|
|
|
|
def setup_characters(self):
|
|
# Create characters with rich backgrounds for testing memory
|
|
alice = Character(
|
|
name="Alice",
|
|
age=23,
|
|
personality="Introverted literature student who loves mystery novels and gets nervous in social situations but is very observant",
|
|
occupation="Graduate student studying Victorian literature",
|
|
location="coffee shop",
|
|
relationships={
|
|
"Professor Wilson": "My thesis advisor - supportive but demanding",
|
|
"Emma": "Friendly barista I have a secret crush on"
|
|
},
|
|
goals=["Finish thesis chapter", "Work up courage to talk to Emma", "Find rare book for research"]
|
|
)
|
|
|
|
bob = Character(
|
|
name="Bob",
|
|
age=28,
|
|
personality="Confident software developer, outgoing and helpful, loves solving technical problems",
|
|
occupation="Senior fullstack developer at local startup",
|
|
location="coffee shop",
|
|
relationships={
|
|
"Alice": "Quiet regular I've seen around - seems nice",
|
|
"Emma": "Friendly barista, always remembers my order"
|
|
},
|
|
goals=["Launch new feature this week", "Ask someone interesting on a date", "Learn more about AI"]
|
|
)
|
|
|
|
emma = Character(
|
|
name="Emma",
|
|
age=25,
|
|
personality="Energetic art student working as barista, cheerful and social, dreams of opening gallery",
|
|
occupation="Barista and art student",
|
|
location="coffee shop counter",
|
|
relationships={
|
|
"Alice": "Sweet regular who seems shy - orders same drink daily",
|
|
"Bob": "Tech guy regular - always friendly and tips well"
|
|
},
|
|
goals=["Save money for art supplies", "Organize local art show", "Connect with more creative people"]
|
|
)
|
|
|
|
for character in [alice, bob, emma]:
|
|
self.scene_manager.add_character(character)
|
|
|
|
def get_character_response(self, character_name: str, user_input: str) -> str:
|
|
if character_name not in self.scene_manager.agents:
|
|
return f"❌ Character {character_name} not found!"
|
|
|
|
print(f"🧠 {character_name} accessing memories...")
|
|
agent = self.scene_manager.agents[character_name]
|
|
|
|
# Agent perceives user interaction
|
|
agent.perceive(f"Someone asked me: '{user_input}'")
|
|
|
|
# Generate response
|
|
response = agent.react_to_situation(user_input)
|
|
return response
|
|
|
|
def character_chat(self, char1: str, char2: str, context: str) -> str:
|
|
"""Make two characters interact with each other"""
|
|
interaction = self.scene_manager.character_interaction(char1, char2, context)
|
|
|
|
if "error" in interaction:
|
|
return interaction["error"]
|
|
|
|
result = f"\n💬 **{char1}**: {interaction[char1]}\n💬 **{char2}**: {interaction[char2]}\n"
|
|
return result
|
|
|
|
def advance_scene_time(self, hours: int = 1):
|
|
"""Advance time and let characters plan"""
|
|
self.scene_manager.advance_time(hours)
|
|
return f"⏰ Advanced time by {hours} hour(s). Current time: {self.scene_manager.scene_state['time']}"
|
|
|
|
def get_character_memories(self, character_name: str, memory_type: str = "all") -> str:
|
|
"""Show character's memory stream for debugging"""
|
|
if character_name not in self.scene_manager.agents:
|
|
return f"Character {character_name} not found"
|
|
|
|
agent = self.scene_manager.agents[character_name]
|
|
memories = agent.memory_stream.memories
|
|
|
|
if memory_type != "all":
|
|
memories = [m for m in memories if m.memory_type == memory_type]
|
|
|
|
result = f"\n🧠 {character_name}'s {memory_type} memories ({len(memories)} total):\n"
|
|
for i, memory in enumerate(memories[-10:]): # Show last 10
|
|
result += f"{i+1}. [{memory.memory_type}] {memory.description} (importance: {memory.importance_score})\n"
|
|
|
|
return result
|
|
|
|
def get_character_summary(self, character_name: str) -> str:
|
|
"""Get AI-generated summary of character based on their memories"""
|
|
if character_name not in self.scene_manager.agents:
|
|
return f"Character {character_name} not found"
|
|
|
|
agent = self.scene_manager.agents[character_name]
|
|
summary = agent.get_summary()
|
|
|
|
return f"\n📝 Current summary of {character_name}:\n{summary}\n"
|
|
|
|
|
|
def main():
|
|
print("🎭 Advanced Multi-Agent Roleplay with Stanford Memory Architecture")
|
|
print("=" * 70)
|
|
print("This implements Stanford's proven memory system:")
|
|
print("• Memory Stream: observations, reflections, plans")
|
|
print("• Smart Retrieval: recency + importance + relevance")
|
|
print("• Auto Reflection: generates insights when importance threshold hit")
|
|
print("• Natural Forgetting: older memories become less accessible")
|
|
print()
|
|
print("🎯 COMMANDS:")
|
|
print(" talk <character> <message> - Character responds using their memories")
|
|
print(" chat <char1> <char2> <context> - Two characters interact")
|
|
print(" time <hours> - Advance time, triggers planning")
|
|
print(" memories <character> [type] - Show character's memories")
|
|
print(" summary <character> - AI summary of character")
|
|
print(" status - Show scene status")
|
|
print(" quit - Exit")
|
|
print()
|
|
|
|
if not os.getenv("OPENAI_API_KEY"):
|
|
print("⚠️ Set OPENAI_API_KEY environment variable to use real LLMs")
|
|
print()
|
|
|
|
system = EnhancedRoleplaySystem()
|
|
|
|
# Give agents some initial experiences
|
|
print("🌱 Setting up initial memories...")
|
|
system.scene_manager.agents["Alice"].perceive("I spilled coffee on my notes yesterday - so embarrassing")
|
|
system.scene_manager.agents["Alice"].perceive("Emma helped me clean up and was really sweet about it")
|
|
system.scene_manager.agents["Bob"].perceive("Shipped a major feature at work - feeling accomplished")
|
|
system.scene_manager.agents["Emma"].perceive("A shy regular (Alice) has been coming in every day this week")
|
|
print("✓ Initial memories established")
|
|
print()
|
|
|
|
print("🧪 TRY THESE EXPERIMENTS:")
|
|
print("1. talk Alice How are you feeling today?")
|
|
print("2. time 2 (advance time to trigger reflection)")
|
|
print("3. memories Alice reflection (see generated insights)")
|
|
print("4. chat Alice Emma You both seem to be here often")
|
|
print("5. summary Alice (see how memories shaped character)")
|
|
print()
|
|
|
|
while True:
|
|
try:
|
|
command = input("> ").strip()
|
|
|
|
if command == "quit":
|
|
print("👋 Goodbye!")
|
|
break
|
|
elif command == "status":
|
|
print(f"\n📍 Scene: {system.scene_manager.scene_state['location']}")
|
|
print(f"⏰ Time: {system.scene_manager.scene_state['time']}")
|
|
print(f"👥 Characters: {', '.join(system.scene_manager.characters.keys())}")
|
|
for name, agent in system.scene_manager.agents.items():
|
|
mem_count = len(agent.memory_stream.memories)
|
|
reflections = len([m for m in agent.memory_stream.memories if m.memory_type == "reflection"])
|
|
print(f" {name}: {mem_count} memories ({reflections} reflections)")
|
|
print()
|
|
|
|
elif command.startswith("talk "):
|
|
parts = command.split(" ", 2)
|
|
if len(parts) >= 3:
|
|
character, message = parts[1], parts[2]
|
|
print(f"\n🗣️ You to {character}: {message}")
|
|
response = system.get_character_response(character, message)
|
|
print(f"💬 {character}: {response}\n")
|
|
else:
|
|
print("❓ Usage: talk <character> <message>")
|
|
|
|
elif command.startswith("chat "):
|
|
parts = command.split(" ", 3)
|
|
if len(parts) >= 4:
|
|
char1, char2, context = parts[1], parts[2], parts[3]
|
|
print(f"\n🎬 Setting up interaction: {context}")
|
|
result = system.character_chat(char1, char2, context)
|
|
print(result)
|
|
else:
|
|
print("❓ Usage: chat <character1> <character2> <context>")
|
|
|
|
elif command.startswith("time "):
|
|
try:
|
|
hours = int(command.split()[1])
|
|
result = system.advance_scene_time(hours)
|
|
print(result)
|
|
# Show what characters are planning
|
|
for name, agent in system.scene_manager.agents.items():
|
|
plan = agent.plan_day()
|
|
print(f"📅 {name}'s plan: {plan[0] if plan else 'No specific plans'}")
|
|
except (IndexError, ValueError):
|
|
print("❓ Usage: time <hours>")
|
|
|
|
elif command.startswith("memories "):
|
|
parts = command.split()
|
|
character = parts[1] if len(parts) > 1 else ""
|
|
memory_type = parts[2] if len(parts) > 2 else "all"
|
|
if character:
|
|
result = system.get_character_memories(character, memory_type)
|
|
print(result)
|
|
else:
|
|
print("❓ Usage: memories <character> [observation/reflection/plan/all]")
|
|
|
|
elif command.startswith("summary "):
|
|
character = command.split()[1] if len(command.split()) > 1 else ""
|
|
if character:
|
|
result = system.get_character_summary(character)
|
|
print(result)
|
|
else:
|
|
print("❓ Usage: summary <character>")
|
|
|
|
else:
|
|
print("❓ Commands: talk, chat, time, memories, summary, status, quit")
|
|
|
|
except KeyboardInterrupt:
|
|
print("\n👋 Goodbye!")
|
|
break
|
|
except Exception as e:
|
|
print(f"💥 Error: {e}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|