from typing import List, Dict, Literal from living_agents import LLMAgent, Character, CharacterAgent, Memory import logging from datetime import datetime, timedelta logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) class RoleplaySystem: agents: Dict[Character, CharacterAgent] global_time: datetime scene_state: Dict def __init__(self): self.agents = {} self.global_time = datetime.now() self.scene_state = { "location": "cozy coffee shop", "time": "afternoon", "atmosphere": "quiet and peaceful", "active_conversations": [], "events": [] } async def setup_characters(self, characters: List[Character]): logger.info('Setting up Characters.') for character in characters: self.agents[character] = CharacterAgent(character, LLMAgent(temperature=0.9)) await self.agents[character].initialize_memories() async def get_character_response(self, character: Character, user_input: str) -> str: print(f"🧠 {character} accessing memories...") # Agent perceives user interaction await self.agents[character].perceive(f"Someone asked me: '{user_input}'") # Generate response response = await self.agents[character].react_to_situation(user_input) return response async def character_chat(self, character_1: Character, character_2: Character, context: str) -> str: """Make two characters interact with each other""" interaction = await self.character_interaction(character_1, character_2, context) result = f"\nšŸ’¬ **{character_1}**: {interaction[character_1]}\nšŸ’¬ **{character_2}**: {interaction[character_2]}\n" return result async def advance_time(self, hours: int = 1): """Advance scene time and trigger agent planning""" self.global_time += timedelta(hours=hours) self.scene_state["time"] = self.global_time.strftime("%I:%M %p") # Each agent plans their next actions for character, agent in self.agents.items(): await agent.perceive(f"Time is now {self.scene_state['time']}") def get_character_memories(self, character: Character, memory_type: Literal['all', 'observation', 'reflection', 'plan'] = "all") -> List[Memory]: return self.agents[character].memory_stream.memories async def get_character_summary(self, character: Character) -> str: """Get AI-generated summary of character based on their memories""" summary = await self.agents[character].get_summary() return f"\nšŸ“ Current summary of {character}:\n{summary}\n" async def character_interaction(self, character_1: Character, character_2: Character, context: str) -> Dict[Character, str]: """Handle interaction between two characters""" char1_agent = self.agents[character_1] char2_agent = self.agents[character_2] # Both characters observe the interaction context await char1_agent.perceive(f"Interacting with {character_2}: {context}") await char2_agent.perceive(f"Interacting with {character_1}: {context}") # Generate responses char1_response = await char1_agent.react_to_situation(f"You are talking with {character_2}. Context: {context}") char2_response = await char2_agent.react_to_situation(f"{character_1} said: '{char1_response}'") # Both remember the conversation await char1_agent.perceive(f"Conversation with {character_2}: I said '{char1_response}', they replied '{char2_response}'") await char2_agent.perceive(f"Conversation with {character_1}: They said '{char1_response}', I replied '{char2_response}'") return { character_1: char1_response, character_2: char2_response }