510 lines
19 KiB
Python
510 lines
19 KiB
Python
import asyncio
|
|
import os
|
|
import pickle
|
|
from pprint import pprint
|
|
from datetime import datetime
|
|
from typing import Optional, List
|
|
|
|
import yaml
|
|
from pathlib import Path
|
|
from dotenv import load_dotenv
|
|
from living_agents import CharacterAgent, PromptManager
|
|
import logging
|
|
|
|
from llm_connector import LLMMessage
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def save_character_to_cache(agent: CharacterAgent, template_path: Path):
|
|
"""Save a character agent to cache"""
|
|
cache_path = Path('.cache') / f"{template_path.stem}.pkl"
|
|
|
|
try:
|
|
# Create cache data with metadata
|
|
cache_data = {
|
|
'character': agent.character,
|
|
'memories': agent.memory_stream.memories,
|
|
'template_path': template_path,
|
|
'cached_at': datetime.now(),
|
|
'cache_version': '1.0'
|
|
}
|
|
|
|
with open(cache_path, 'wb') as f:
|
|
pickle.dump(cache_data, f)
|
|
|
|
print(f"💾 Character saved to cache: {cache_path.name}")
|
|
|
|
except Exception as e:
|
|
print(f"⚠️ Warning: Could not save character to cache: {e}")
|
|
|
|
|
|
def load_character_from_cache(template_path: Path) -> Optional[CharacterAgent]:
|
|
"""Load a character agent from cache, return None if not found or invalid"""
|
|
cache_path = Path('.cache') / f"{template_path.stem}.pkl"
|
|
|
|
if not cache_path.exists():
|
|
return None
|
|
|
|
try:
|
|
with open(cache_path, 'rb') as f:
|
|
cache_data = pickle.load(f)
|
|
|
|
# Verify cache data structure
|
|
required_keys = ['character', 'memories', 'template_path', 'cached_at']
|
|
if not all(key in cache_data for key in required_keys):
|
|
print(f"⚠️ Invalid cache file format: {cache_path.name}")
|
|
return None
|
|
|
|
# Check if template file is newer than cache
|
|
template_mtime = Path(template_path).stat().st_mtime
|
|
cache_time = cache_data['cached_at'].timestamp()
|
|
|
|
if template_mtime > cache_time:
|
|
print(f"📝 Template file is newer than cache, will recreate character")
|
|
return None
|
|
|
|
# Reconstruct CharacterAgent
|
|
from living_agents import LLMAgent
|
|
agent = CharacterAgent(cache_data['character'], LLMAgent())
|
|
agent.memory_stream.memories = cache_data['memories']
|
|
|
|
cached_date = cache_data['cached_at'].strftime('%Y-%m-%d %H:%M:%S')
|
|
print(f"💾 Loaded character from cache (created: {cached_date})")
|
|
|
|
return agent
|
|
|
|
except Exception as e:
|
|
print(f"⚠️ Could not load character from cache: {e}")
|
|
return None
|
|
|
|
|
|
def get_cache_info(template_path: Path) -> Optional[dict]:
|
|
"""Get information about cached character"""
|
|
cache_path = Path('.cache') / f"{template_path.stem}.pkl"
|
|
|
|
if not cache_path.exists():
|
|
return None
|
|
|
|
try:
|
|
with open(cache_path, 'rb') as f:
|
|
cache_data = pickle.load(f)
|
|
|
|
return {
|
|
'cache_file': cache_path.name,
|
|
'cached_at': cache_data['cached_at'],
|
|
'character_name': cache_data['character'].name,
|
|
'memory_count': len(cache_data['memories'])
|
|
}
|
|
except:
|
|
return None
|
|
|
|
|
|
class CharacterExplorer:
|
|
"""Interactive explorer for testing CharacterAgent functionality"""
|
|
chat_history: List[LLMMessage]
|
|
|
|
def __init__(self, character_agent: CharacterAgent):
|
|
self.agent = character_agent
|
|
self.chat_history = []
|
|
|
|
async def start_interactive_session(self):
|
|
"""Start the interactive exploration menu"""
|
|
|
|
print(f"\n🎭 Character Explorer - {self.agent.character.name}")
|
|
print("=" * 50)
|
|
|
|
while True:
|
|
self._show_menu()
|
|
|
|
try:
|
|
choice = input("\nChoose option (1-8): ").strip()
|
|
|
|
if choice == "1":
|
|
await self._handle_ask_question()
|
|
if choice == "2":
|
|
await self._handle_chat()
|
|
elif choice == "3":
|
|
await self._handle_list_memories()
|
|
elif choice == "4":
|
|
await self._handle_view_memory()
|
|
elif choice == "5":
|
|
await self._handle_memory_stats()
|
|
elif choice == "6":
|
|
await self._handle_character_summary()
|
|
elif choice == "7":
|
|
# Regenerate character
|
|
if await self._handle_regenerate_character():
|
|
break # Exit to main menu after regeneration
|
|
elif choice == "8":
|
|
print("👋 Goodbye!")
|
|
break
|
|
else:
|
|
print("❌ Invalid choice. Please enter 1-8.")
|
|
|
|
except KeyboardInterrupt:
|
|
print("\n👋 Goodbye!")
|
|
break
|
|
except Exception as e:
|
|
print(f"❌ Error: {e}")
|
|
|
|
def clear_character_cache(self) -> bool:
|
|
"""Clear cached character for a template"""
|
|
cache_file = Path('.cache') / f"{self.agent.character.template_file.stem}.pkl"
|
|
|
|
if cache_file.exists():
|
|
try:
|
|
cache_file.unlink()
|
|
return True
|
|
except Exception as e:
|
|
print(f"❌ Could not clear cache: {e}")
|
|
return False
|
|
return False
|
|
|
|
def _show_menu(self):
|
|
"""Display the interactive menu options"""
|
|
print(f"\n🎭 Character Explorer Menu")
|
|
print("=" * 30)
|
|
print(f"1. 💬 Ask {self.agent.character.name} a question")
|
|
print(f"2. 💬 Chat with {self.agent.character.name}")
|
|
print("3. 📚 List all memories")
|
|
print("4. 🔍 View specific memory (with related)")
|
|
print("5. 📊 Memory statistics")
|
|
print("6. 👤 Character summary")
|
|
print("7. 🔄 Regenerate character")
|
|
print("8. 🚪 Exit")
|
|
|
|
async def _handle_ask_question(self):
|
|
"""Handle asking questions to the character"""
|
|
question = input(f"\n💬 Ask {self.agent.character.name}: ").strip()
|
|
if not question:
|
|
return
|
|
|
|
print(f"\n🤔 {self.agent.character.name} is thinking...")
|
|
try:
|
|
response = await self.agent.react_to_situation(question)
|
|
print(f"💬 {self.agent.character.name}: {response}")
|
|
|
|
# Show which memories were retrieved for this response
|
|
relevant_memories = await self.agent.memory_stream.retrieve_related_memories(question, k=5)
|
|
print(f"\n🧠 Memories used for this response:")
|
|
for i, mem in enumerate(relevant_memories, 1):
|
|
print(f" {i}. [{mem.memory_type}] {mem.description[:80]}{'...' if len(mem.description) > 80 else ''}")
|
|
|
|
except Exception as e:
|
|
print(f"❌ Error getting response: {e}")
|
|
|
|
async def _handle_chat(self):
|
|
"""Handle chat with a character"""
|
|
if not self.chat_history:
|
|
scenario_content = input(f"\n💬 Set a scenario for a new chat with {self.agent.character.name}: ").strip()
|
|
if not scenario_content:
|
|
return
|
|
|
|
self.chat_history.append({'role': 'system', 'content': scenario_content})
|
|
|
|
scenario_content = input(f"\n💬 Ask {self.agent.character.name}: ").strip()
|
|
if not scenario_content:
|
|
return
|
|
|
|
print(f"\n🤔 {self.agent.character.name} is thinking...")
|
|
try:
|
|
response = await self.agent.react_to_situation(scenario_content)
|
|
print(f"💬 {self.agent.character.name}: {response}")
|
|
|
|
# Show which memories were retrieved for this response
|
|
relevant_memories = await self.agent.memory_stream.retrieve_related_memories(scenario_content, k=5)
|
|
print(f"\n🧠 Memories used for this response:")
|
|
for i, mem in enumerate(relevant_memories, 1):
|
|
print(f" {i}. [{mem.memory_type}] {mem.description[:80]}{'...' if len(mem.description) > 80 else ''}")
|
|
|
|
except Exception as e:
|
|
print(f"❌ Error getting response: {e}")
|
|
|
|
async def _handle_list_memories(self):
|
|
"""List all memories with filtering options"""
|
|
print("\n📚 Memory Filter Options:")
|
|
print("1. All memories")
|
|
print("2. Observations only")
|
|
print("3. Reflections only")
|
|
print("4. Plans only")
|
|
print("5. By importance (high to low)")
|
|
print("6. By recency (newest first)")
|
|
|
|
filter_choice = input("Choose filter (1-6): ").strip()
|
|
|
|
# memories = self.agent.memory_stream.memories.copy()
|
|
|
|
if filter_choice == "2":
|
|
memories = [m for m in self.agent.memory_stream.memories if m.memory_type == "observation"]
|
|
title = "Observations"
|
|
elif filter_choice == "3":
|
|
memories = [m for m in self.agent.memory_stream.memories if m.memory_type == "reflection"]
|
|
title = "Reflections"
|
|
elif filter_choice == "4":
|
|
memories = [m for m in self.agent.memory_stream.memories if m.memory_type == "plan"]
|
|
title = "Plans"
|
|
elif filter_choice == "5":
|
|
memories = sorted(self.agent.memory_stream.memories, key=lambda m: m.importance_score, reverse=True)
|
|
title = "All Memories (by importance)"
|
|
elif filter_choice == "6":
|
|
memories = sorted(self.agent.memory_stream.memories, key=lambda m: m.creation_time, reverse=True)
|
|
title = "All Memories (by recency)"
|
|
else:
|
|
memories = self.agent.memory_stream.memories
|
|
title = "All Memories"
|
|
|
|
print(f"\n📋 {title} ({len(memories)} total):")
|
|
for i, memory in enumerate(memories):
|
|
age_hours = (memory.last_accessed - memory.creation_time).total_seconds() / 3600
|
|
print(
|
|
f"[#{i:3d}] [{memory.memory_type[:4]}] [imp:{memory.importance_score}] [age:{age_hours:.1f}h] {memory.description}")
|
|
|
|
if len(memories) > 20:
|
|
print(f"\n... showing first 20 of {len(memories)} memories")
|
|
print("💡 Tip: Use option 3 to view specific memories in detail")
|
|
|
|
async def _handle_view_memory(self):
|
|
"""View a specific memory with its related memories"""
|
|
memory_num = int(input(f"\nEnter memory number (1-{len(self.agent.memory_stream.memories) - 1}): ").strip())
|
|
|
|
if 0 <= memory_num <= len(self.agent.memory_stream.memories) - 1:
|
|
memory = self.agent.memory_stream.memories[memory_num]
|
|
|
|
print(f"\n🔍 Memory #{memory_num} Details:")
|
|
print(f" Type: {memory.memory_type}")
|
|
print(f" Importance: {memory.importance_score}/10")
|
|
print(f" Created: {memory.creation_time.strftime('%Y-%m-%d %H:%M:%S')}")
|
|
print(f" Last accessed: {memory.last_accessed.strftime('%Y-%m-%d %H:%M:%S')}")
|
|
print(f" Description: {memory.description}")
|
|
|
|
# Show related memories using embeddings
|
|
if memory.memory_type == 'observation' or memory.memory_type == 'plan':
|
|
print(f"\n🔗 Related memories:")
|
|
|
|
for rel_mem in memory.related_memories:
|
|
rel_index = self.agent.memory_stream.memories.index(rel_mem)
|
|
print(
|
|
f" [#{rel_index:3d}] [{rel_mem.memory_type}] {rel_mem.description}")
|
|
else:
|
|
print(f"❌ Invalid memory number. Range: 1-{len(self.agent.memory_stream.memories)}")
|
|
|
|
async def _handle_memory_stats(self):
|
|
"""Show detailed memory statistics"""
|
|
memories = self.agent.memory_stream.memories
|
|
|
|
print(f"\n📊 Memory Statistics for {self.agent.character.name}:")
|
|
print(f" Total memories: {len(memories)}")
|
|
|
|
# Breakdown by type
|
|
by_type = {}
|
|
for mem in memories:
|
|
by_type[mem.memory_type] = by_type.get(mem.memory_type, 0) + 1
|
|
|
|
print(f"\n📂 Memory Types:")
|
|
for mem_type, count in by_type.items():
|
|
percentage = (count / len(memories)) * 100
|
|
print(f" {mem_type.title()}: {count} ({percentage:.1f}%)")
|
|
|
|
# Importance distribution
|
|
importance_scores = [m.importance_score for m in memories]
|
|
if importance_scores:
|
|
print(f"\n📈 Importance Distribution:")
|
|
print(f" Average: {sum(importance_scores) / len(importance_scores):.1f}")
|
|
print(f" Range: {min(importance_scores)}-{max(importance_scores)}")
|
|
|
|
# Visual histogram
|
|
print(f" Score distribution:")
|
|
for score in range(1, 11):
|
|
count = importance_scores.count(score)
|
|
bar = "█" * max(1, count // 2) if count > 0 else ""
|
|
print(f" {score:2d}: {count:2d} {bar}")
|
|
|
|
# Recency info
|
|
if memories:
|
|
oldest = min(memories, key=lambda m: m.creation_time)
|
|
newest = max(memories, key=lambda m: m.creation_time)
|
|
print(f"\n⏰ Time Span:")
|
|
print(f" Oldest: {oldest.creation_time.strftime('%Y-%m-%d %H:%M')}")
|
|
print(f" Newest: {newest.creation_time.strftime('%Y-%m-%d %H:%M')}")
|
|
|
|
async def _handle_character_summary(self):
|
|
"""Generate and show character summary"""
|
|
print(f"\n👤 Generating summary for {self.agent.character.name}...")
|
|
try:
|
|
# summary = await self.agent.get_summary()
|
|
print(f"\n📝 AI-Generated Character Summary:")
|
|
# print(f" {summary}")
|
|
|
|
print(f"\n📋 Structured Character Data:")
|
|
print(f" Name: {self.agent.character.name}")
|
|
print(f" Age: {self.agent.character.age}")
|
|
print(f" Personality: {self.agent.character.personality}")
|
|
print(f" Occupation: {self.agent.character.occupation}")
|
|
print(f" Location: {self.agent.character.location}")
|
|
print("")
|
|
print(f" - File: {self.agent.character.template_file}")
|
|
print(f" Traits")
|
|
traits_summary = "\n".join([f" - {trait.strength}/10 {trait.name} ({trait.description})" for trait in self.agent.character.traits]) if self.agent.character.traits else "No traits yet."
|
|
print(traits_summary)
|
|
if self.agent.character.relationships:
|
|
print(f" Relationships:")
|
|
for person, relationship in self.agent.character.relationships.items():
|
|
print(f" • {person}: {relationship}")
|
|
|
|
if self.agent.character.goals:
|
|
print(f" Goals:")
|
|
for goal in self.agent.character.goals:
|
|
print(f" • {goal}")
|
|
|
|
except Exception as e:
|
|
print(f"❌ Error generating summary: {e}")
|
|
|
|
async def _handle_regenerate_character(self) -> bool:
|
|
"""Handle regenerating the current character"""
|
|
character_name = self.agent.character.name
|
|
|
|
print(f"\n🔄 Regenerate {character_name}")
|
|
print("This will clear the cache and recreate the character from scratch.")
|
|
print("All current memories and traits will be lost!")
|
|
|
|
# Confirm regeneration
|
|
confirm = input(f"Regenerate {character_name}? (y/N): ").strip().lower()
|
|
if confirm not in ['y', 'yes']:
|
|
print("❌ Regeneration cancelled")
|
|
return False
|
|
|
|
template_file = self.agent.character.template_file
|
|
if self.agent.character.template_file is None:
|
|
print(f"❌ Could not find template file for {character_name}")
|
|
return False
|
|
|
|
# Clear the cache
|
|
if self.clear_character_cache():
|
|
print(f"🗑️ Cleared cache for {character_name}")
|
|
else:
|
|
print("❌ Failed to clear cache")
|
|
return False
|
|
|
|
await load_and_explore_character(template_file)
|
|
return True
|
|
|
|
|
|
def select_character_template(show_cache_info: bool = True) -> Path:
|
|
"""Show character template selection menu and return chosen template path"""
|
|
|
|
templates_dir = Path(__file__).parent / 'character_templates'
|
|
|
|
# Find all .yml files in the character_templates directory
|
|
template_files = list(templates_dir.glob('*.yml'))
|
|
|
|
if not template_files:
|
|
print("❌ No character templates found in character_templates/")
|
|
return None
|
|
|
|
print("\n🎭 Available Character Templates:")
|
|
print("=" * 60)
|
|
|
|
# Display template options with cache info
|
|
for i, template_file in enumerate(template_files, 1):
|
|
# Try to load the template to get the name
|
|
try:
|
|
with open(template_file, 'r', encoding='utf-8') as f:
|
|
template_data = yaml.safe_load(f)
|
|
character_name = template_data.get('name', template_file.stem)
|
|
except:
|
|
character_name = template_file.stem
|
|
|
|
# Check for cached version if requested
|
|
if show_cache_info:
|
|
cache_info = get_cache_info(template_file)
|
|
if cache_info:
|
|
cache_status = f"💾 (cached {cache_info['cached_at'].strftime('%m/%d %H:%M')})"
|
|
else:
|
|
cache_status = "🔄 (will create new)"
|
|
print(f"{i}. {character_name} ({template_file.name}) {cache_status}")
|
|
else:
|
|
print(f"{i}. {character_name} ({template_file.name})")
|
|
|
|
print(f"{len(template_files) + 1}. 🚪 Exit")
|
|
|
|
# Get user choice
|
|
while True:
|
|
try:
|
|
choice = input(f"\nChoose a character template (1-{len(template_files) + 1}): ").strip()
|
|
choice_num = int(choice)
|
|
|
|
if choice_num == len(template_files) + 1:
|
|
return None # Exit choice
|
|
elif 1 <= choice_num <= len(template_files):
|
|
return template_files[choice_num - 1]
|
|
else:
|
|
print(f"❌ Invalid choice. Please enter 1-{len(template_files) + 1}.")
|
|
except ValueError:
|
|
print(f"❌ Please enter a valid number (1-{len(template_files) + 1}).")
|
|
except KeyboardInterrupt:
|
|
return None
|
|
|
|
|
|
async def load_and_explore_character(template_path: Path):
|
|
"""Load a character template and start exploration"""
|
|
|
|
# Load environment
|
|
load_dotenv()
|
|
|
|
# Try to load from cache first
|
|
print(f"📁 Loading character from {template_path.name}...")
|
|
agent = load_character_from_cache(template_path)
|
|
|
|
if agent is None:
|
|
# No cache or cache invalid, create from template
|
|
try:
|
|
with open(template_path, 'r', encoding='utf-8') as f:
|
|
template = yaml.safe_load(f)
|
|
template['yaml_file'] = template_path
|
|
except Exception as e:
|
|
print(f"❌ Error loading template: {e}")
|
|
return None
|
|
|
|
print(f"🤖 Creating memories and scoring importance...")
|
|
|
|
try:
|
|
# Create character agent from template
|
|
agent = await CharacterAgent.create_from_template(template)
|
|
print(f"✅ {agent.character.name} created successfully!")
|
|
|
|
# Save to cache for next time
|
|
save_character_to_cache(agent, template_path)
|
|
|
|
except Exception as e:
|
|
print(f"❌ Error creating character: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
return None
|
|
else:
|
|
print(f"✅ {agent.character.name} loaded successfully!")
|
|
|
|
# Start explorer
|
|
explorer = CharacterExplorer(agent)
|
|
await explorer.start_interactive_session()
|
|
|
|
return agent
|
|
|
|
|
|
async def main():
|
|
"""Main function with character selection"""
|
|
print("🎭 Welcome to the Living Agents Character Explorer!")
|
|
|
|
selected_template = select_character_template(show_cache_info=True)
|
|
|
|
if selected_template is None:
|
|
print("👋 Goodbye!")
|
|
return
|
|
|
|
# Load and explore the selected character
|
|
await load_and_explore_character(selected_template)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
asyncio.run(main())
|