first censoring test
This commit is contained in:
1
.python-version
Normal file
1
.python-version
Normal file
@@ -0,0 +1 @@
|
||||
3.13
|
||||
11
pyproject.toml
Normal file
11
pyproject.toml
Normal file
@@ -0,0 +1,11 @@
|
||||
[project]
|
||||
name = "llmtools"
|
||||
version = "0.1.0"
|
||||
description = "Add your description here"
|
||||
readme = "README.md"
|
||||
requires-python = ">=3.13"
|
||||
dependencies = [
|
||||
"nicegui>=2.24.0",
|
||||
"pydantic-ai>=1.0.1",
|
||||
"transformers>=4.56.1",
|
||||
]
|
||||
6
src/main.py
Normal file
6
src/main.py
Normal file
@@ -0,0 +1,6 @@
|
||||
def main():
|
||||
print("Hello from llmtools!")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
34
src/prompts/generate_placeholders.md
Normal file
34
src/prompts/generate_placeholders.md
Normal file
@@ -0,0 +1,34 @@
|
||||
You are a data anonymization assistant. Generate appropriate placeholders for sensitive data.
|
||||
|
||||
CRITICAL ENTITY RESOLUTION RULES:
|
||||
1. Only group items that refer to the SAME entity:
|
||||
- "Stefan", "Stefn" (typo), "Stefan Müller" → [Person A] (same person: Stefan)
|
||||
- "Klaus", "Klaus Müller" → [Person B] (same person: Klaus)
|
||||
- "Stefan Müller" and "Klaus Müller" → DIFFERENT people! Different first names = different people!
|
||||
|
||||
2. When matching names:
|
||||
- First name typos (Stefn/Stefan) = same person
|
||||
- First name + full name = same person (Stefan + Stefan Müller)
|
||||
- DIFFERENT first names = DIFFERENT people (Klaus ≠ Stefan, even if same last name)
|
||||
- Company departments (like "Controlling") are NOT person names
|
||||
|
||||
3. Be careful with common last names like Müller, Schmidt, Meyer:
|
||||
- "Klaus Müller" and "Stefan Müller" are TWO different people
|
||||
- Only group them if they have the SAME first name or obvious typo
|
||||
|
||||
Rules for placeholders:
|
||||
- For person names: Use [Person A], [Person B], etc.
|
||||
- For company names: Use [Company A], [Company B], etc.
|
||||
- For email addresses: Use [Email A], [Email B], etc.
|
||||
- For phone numbers: Use [Phone A], [Phone B], etc.
|
||||
- For passport/ID numbers: Use [ID A], [ID B], etc.
|
||||
- For dates: Use [Date A], [Date B], etc.
|
||||
- For confirmation codes: Use [Code A], [Code B], etc.
|
||||
- For other identifiers: Use descriptive placeholders like [Account Number A], [Frequent Flyer A], etc.
|
||||
|
||||
Use sequential lettering (A, B, C...) for each category.
|
||||
Return a list of tuples where each tuple contains (placeholder, list_of_original_texts).
|
||||
The list should contain ALL variations of the same entity found in the data.
|
||||
|
||||
|
||||
Generate placeholders for all sensitive data items user provides!
|
||||
14
src/prompts/get_sensitive_items.md
Normal file
14
src/prompts/get_sensitive_items.md
Normal file
@@ -0,0 +1,14 @@
|
||||
You are a privacy protection assistant. Identify all personally identifiable information (PII)
|
||||
and sensitive data that should be censored. This includes:
|
||||
- Names of people
|
||||
- Passport numbers
|
||||
- Dates of birth
|
||||
- Email addresses
|
||||
- Phone numbers
|
||||
- Confirmation codes/booking references
|
||||
- Frequent flyer numbers
|
||||
- Any other personal identifiers
|
||||
|
||||
Return ONLY the exact text strings that should be censored, exactly as they appear in the input.
|
||||
|
||||
Identify all sensitive data to censor in the text user provides.
|
||||
112
src/pydantic_censoring.py
Normal file
112
src/pydantic_censoring.py
Normal file
@@ -0,0 +1,112 @@
|
||||
import asyncio
|
||||
from typing import Set, List, Tuple
|
||||
from pydantic import BaseModel
|
||||
from pydantic_ai import Agent
|
||||
from pydantic_ai.models.openai import OpenAIChatModel, Model
|
||||
from pydantic_ai.providers.openai import OpenAIProvider
|
||||
|
||||
from utils import get_token_count, PromptManager
|
||||
|
||||
example_message = "Passenger: David Lee, Passport Number: A12345678, Date of Birth: 07/22/1990. Flight booking confirmation: ABC123XYZ. Contact email: david.lee@travel.com, Mobile: +1-650-555-2468. Frequent flyer number: FF-998877665."
|
||||
|
||||
messages_to_censor = [
|
||||
"Hallo IT Stefn hier,"
|
||||
"hab ein Problem mit unserem CRM System. Wenn ich Kundendaten aufrufe kommt immer eine Fehlermeldung und dann sehe ich plötzlich Daten von anderen Kunden die ich gar nicht aufgerufen habe. Vorhin wollte ich die Adresse von Müller GmbH checken und dann waren da auf einmal die Kontodaten und Umsätze von der Firma Weber & Söhne drin. Das kann ja wohl nicht sein?? Hab dann schnell alles zugemacht weil das bestimmt nicht richtig ist."
|
||||
"Ist schon öfter passiert die letzten Tage, dachte erst ich hab mich verklickt aber nee, da läuft definitiv was schief. Kann auch nicht mehr richtig neue Kunden anlegen, das System hängt sich immer auf."
|
||||
"Das ist ziemlich blöd weil ich grade die Quartalszahlen zusammenstellen muss und Zugriff auf alle Kundendaten brauche. Außerdem will ich nicht ausversehen fremde Daten sehen die mich nichts angehen."
|
||||
"Könnt ihr das schnell fixen? Oder soll ich erstmal gar nicht mehr ins CRM bis das repariert ist?"
|
||||
"Danke!"
|
||||
"Stefan aus dem Controlling",
|
||||
"Hi,"
|
||||
"unser Druckersystem macht Probleme. Wenn ich was ausdrucke kommt manchmal das falsche raus. Heute wollte ich meine Gehaltsabrechnung drucken und rausgekommen ist die Abrechnung von Stefan Müller aus der Buchhaltung. Ist mir schon letzte Woche passiert, da hatte ich den Arbeitsvertrag von Klaus Müller statt meinem eigenen bekommen. "
|
||||
"Scheint immer dieser Klaus Müller zu sein dessen Sachen bei mir landen. Hab die Papiere natürlich sofort geschreddert aber das geht so nicht weiter."
|
||||
"Bitte mal schauen was da los ist!"
|
||||
"Sandra"
|
||||
]
|
||||
|
||||
|
||||
class SensitiveData(BaseModel):
|
||||
"""Structure for identifying sensitive data that should be censored"""
|
||||
sensitive_items: Set[str]
|
||||
|
||||
|
||||
async def simple_test_response(model: Model):
|
||||
prompt = 'Where does "hello world" come from? Answer in one sentence.'
|
||||
print(f"Input token count: {get_token_count(prompt)}")
|
||||
previous_output = ""
|
||||
|
||||
agent = Agent(model)
|
||||
async with agent.run_stream(prompt) as result:
|
||||
async for message in result.stream_output():
|
||||
delta = message[len(previous_output):]
|
||||
print(delta, end='', flush=True)
|
||||
previous_output = message
|
||||
|
||||
print() # Add a final newline
|
||||
|
||||
|
||||
async def get_words_to_censor(model: Model, prompt: str) -> Set[str]:
|
||||
# Make a structured response that will return everything from the prompt that needs censoring.
|
||||
|
||||
# Create an agent that returns structured data
|
||||
censor_agent = Agent(
|
||||
model,
|
||||
output_type=SensitiveData,
|
||||
system_prompt=PromptManager.get_prompt('get_sensitive_items')
|
||||
)
|
||||
|
||||
result = await censor_agent.run(prompt)
|
||||
sensitive_items = set(sorted(list(result.output.sensitive_items))) # for sorting
|
||||
return sensitive_items
|
||||
|
||||
|
||||
class PlaceholderMapping(BaseModel):
|
||||
"""Structure for mapping sensitive data to placeholders"""
|
||||
mappings: List[Tuple[str, List[str]]] # List of (placeholder, [original_texts]) tuples
|
||||
|
||||
|
||||
async def generate_placeholders(model: Model, censored_words: Set[str]) -> List[Tuple[str, List[str]]]:
|
||||
"""Generate placeholders for censored words"""
|
||||
placeholder_agent = Agent(
|
||||
model,
|
||||
output_type=PlaceholderMapping,
|
||||
system_prompt=PromptManager.get_prompt('generate_placeholders', {})
|
||||
)
|
||||
|
||||
# Convert set to sorted list for consistent ordering
|
||||
words_list = sorted(list(censored_words))
|
||||
|
||||
result = await placeholder_agent.run(words_list)
|
||||
return result.output.mappings
|
||||
|
||||
|
||||
async def main():
|
||||
"""Main function"""
|
||||
# vLLM instance configuration
|
||||
model = OpenAIChatModel(
|
||||
'Qwen/Qwen2.5-14B-Instruct-GPTQ-Int4',
|
||||
provider=OpenAIProvider(
|
||||
base_url='http://10.10.10.131:8000/v1',
|
||||
api_key='dummy'
|
||||
)
|
||||
)
|
||||
|
||||
censored_words: Set = set()
|
||||
for message in messages_to_censor:
|
||||
censored_words = censored_words | await get_words_to_censor(model, message)
|
||||
|
||||
print("\nWords to censor:")
|
||||
for word in censored_words:
|
||||
print(f" - {word}")
|
||||
|
||||
# Generate placeholders
|
||||
print("\nGenerating placeholders...")
|
||||
placeholder_mappings = await generate_placeholders(model, censored_words)
|
||||
|
||||
print("\nPlaceholder mappings:")
|
||||
for placeholder, originals in placeholder_mappings:
|
||||
print(f" {placeholder} → {', '.join(originals)}")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
asyncio.run(main())
|
||||
4
src/utils/__init__.py
Normal file
4
src/utils/__init__.py
Normal file
@@ -0,0 +1,4 @@
|
||||
from .count_tokens import get_token_count
|
||||
from .promp_manager import PromptManager
|
||||
|
||||
__all__ = ['get_token_count', 'PromptManager']
|
||||
14
src/utils/count_tokens.py
Normal file
14
src/utils/count_tokens.py
Normal file
@@ -0,0 +1,14 @@
|
||||
import os
|
||||
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
|
||||
os.environ['TRANSFORMERS_VERBOSITY'] = 'error'
|
||||
|
||||
from transformers import AutoTokenizer
|
||||
import warnings
|
||||
warnings.filterwarnings("ignore", message="None of PyTorch, TensorFlow")
|
||||
|
||||
|
||||
def get_token_count(text: str, pretrained_model: str = "Qwen/Qwen2.5-14B-Instruct") -> int:
|
||||
|
||||
tokenizer = AutoTokenizer.from_pretrained(pretrained_model)
|
||||
tokens = tokenizer.encode(text)
|
||||
return len(tokens)
|
||||
232
src/utils/promp_manager.py
Normal file
232
src/utils/promp_manager.py
Normal file
@@ -0,0 +1,232 @@
|
||||
import re
|
||||
import json
|
||||
from pathlib import Path
|
||||
from typing import Dict, Any, Set, Optional, Tuple
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class PromptManager:
|
||||
"""Singleton class to manage prompt templates and JSON schemas"""
|
||||
|
||||
_instance: Optional['PromptManager'] = None
|
||||
_initialized: bool = False
|
||||
|
||||
def __new__(cls):
|
||||
if cls._instance is None:
|
||||
cls._instance = super().__new__(cls)
|
||||
return cls._instance
|
||||
|
||||
def __init__(self):
|
||||
if not self._initialized:
|
||||
self.prompts: Dict[str, str] = {}
|
||||
self.schemas: Dict[str, Dict[str, Any]] = {}
|
||||
self.prompt_variables: Dict[str, Set[str]] = {}
|
||||
self._load_all_prompts()
|
||||
PromptManager._initialized = True
|
||||
|
||||
def _load_all_prompts(self):
|
||||
"""Load all markdown files and corresponding JSON schemas from the prompts folder"""
|
||||
prompts_dir = Path(__file__).parent.parent / 'prompts'
|
||||
|
||||
if not prompts_dir.exists():
|
||||
logger.warning(f"Prompts directory not found: {prompts_dir}")
|
||||
prompts_dir.mkdir(parents=True, exist_ok=True)
|
||||
return
|
||||
|
||||
logger.info(f"Loading prompts and schemas from {prompts_dir}")
|
||||
|
||||
# Load all .md files
|
||||
for md_file in prompts_dir.glob("*.md"):
|
||||
prompt_name = md_file.stem # filename without extension
|
||||
|
||||
try:
|
||||
# Load prompt template
|
||||
with open(md_file, 'r', encoding='utf-8') as f:
|
||||
content = f.read().strip()
|
||||
|
||||
# Extract variables from {{variable}} patterns
|
||||
variables = self._extract_variables(content)
|
||||
|
||||
self.prompts[prompt_name] = content
|
||||
self.prompt_variables[prompt_name] = variables
|
||||
|
||||
# Look for corresponding JSON schema file
|
||||
schema_file = md_file.with_suffix('.json')
|
||||
if schema_file.exists():
|
||||
try:
|
||||
with open(schema_file, 'r', encoding='utf-8') as f:
|
||||
schema = json.load(f)
|
||||
|
||||
self.schemas[prompt_name] = schema
|
||||
logger.debug(f"Loaded prompt '{prompt_name}' with schema and variables: {variables}")
|
||||
|
||||
except json.JSONDecodeError as e:
|
||||
logger.error(f"Invalid JSON schema in {schema_file}: {e}")
|
||||
|
||||
else:
|
||||
logger.debug(f"Loaded prompt '{prompt_name}' (no schema) with variables: {variables}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error loading prompt file {md_file}: {e}")
|
||||
|
||||
logger.info(f"Loaded {len(self.prompts)} prompt templates, {len(self.schemas)} with schemas")
|
||||
|
||||
def _extract_variables(self, template: str) -> Set[str]:
|
||||
"""Extract all {{variable}} placeholders from template"""
|
||||
pattern = r'\{\{(\w+)\}\}'
|
||||
variables = set(re.findall(pattern, template))
|
||||
return variables
|
||||
|
||||
def _validate_context(self, prompt_name: str, context: Dict[str, Any]) -> None:
|
||||
"""Validate that all required variables are provided"""
|
||||
if prompt_name not in self.prompt_variables:
|
||||
raise ValueError(f"Unknown prompt: '{prompt_name}'")
|
||||
|
||||
required_vars = self.prompt_variables[prompt_name]
|
||||
provided_vars = set(context.keys())
|
||||
|
||||
missing_vars = required_vars - provided_vars
|
||||
if missing_vars:
|
||||
raise ValueError(
|
||||
f"Missing required variables for prompt '{prompt_name}': {missing_vars}. "
|
||||
f"Required: {required_vars}, Provided: {provided_vars}"
|
||||
)
|
||||
|
||||
# Warn about extra variables (not an error, but might indicate mistakes)
|
||||
extra_vars = provided_vars - required_vars
|
||||
if extra_vars:
|
||||
logger.warning(f"Extra variables provided for prompt '{prompt_name}': {extra_vars}")
|
||||
|
||||
def _fill_template(self, template: str, context: Dict[str, Any]) -> str:
|
||||
"""Fill template with context variables"""
|
||||
result = template
|
||||
|
||||
for key, value in context.items():
|
||||
placeholder = f"{{{{{key}}}}}" # {{key}}
|
||||
result = result.replace(placeholder, str(value))
|
||||
|
||||
return result
|
||||
|
||||
@classmethod
|
||||
def get_prompt(cls, prompt_name: str, context: Dict[str, Any] = None) -> str:
|
||||
"""
|
||||
Get a processed prompt with variables filled in
|
||||
|
||||
Args:
|
||||
prompt_name: Name of the prompt template (filename without .md)
|
||||
context: Dictionary of variables to fill in the template
|
||||
|
||||
Returns:
|
||||
Processed prompt string
|
||||
|
||||
Raises:
|
||||
ValueError: If prompt doesn't exist or required variables are missing
|
||||
"""
|
||||
instance = cls()
|
||||
|
||||
if prompt_name not in instance.prompts:
|
||||
available_prompts = list(instance.prompts.keys())
|
||||
raise ValueError(f"Prompt '{prompt_name}' not found. Available prompts: {available_prompts}")
|
||||
|
||||
context = context or {}
|
||||
|
||||
# Validate that all required variables are provided
|
||||
instance._validate_context(prompt_name, context)
|
||||
|
||||
# Fill the template
|
||||
template = instance.prompts[prompt_name]
|
||||
processed_prompt = instance._fill_template(template, context)
|
||||
|
||||
return processed_prompt
|
||||
|
||||
@classmethod
|
||||
def get_schema(cls, prompt_name: str) -> Optional[Dict[str, Any]]:
|
||||
"""
|
||||
Get the JSON schema for a prompt if it exists
|
||||
|
||||
Args:
|
||||
prompt_name: Name of the prompt template
|
||||
|
||||
Returns:
|
||||
JSON schema dictionary or None if no schema exists
|
||||
"""
|
||||
instance = cls()
|
||||
|
||||
if prompt_name not in instance.prompts:
|
||||
raise ValueError(f"Prompt '{prompt_name}' not found")
|
||||
|
||||
return instance.schemas.get(prompt_name)
|
||||
|
||||
@classmethod
|
||||
def has_schema(cls, prompt_name: str) -> bool:
|
||||
"""Check if a prompt has a JSON schema"""
|
||||
instance = cls()
|
||||
return prompt_name in instance.schemas
|
||||
|
||||
@classmethod
|
||||
def get_prompt_with_schema(cls, prompt_name: str, context: Dict[str, Any] = None) -> Tuple[str, Optional[Dict[str, Any]]]:
|
||||
"""
|
||||
Get both the processed prompt and its schema (if available)
|
||||
|
||||
Returns:
|
||||
Tuple of (prompt_string, schema_dict_or_None)
|
||||
"""
|
||||
prompt = cls.get_prompt(prompt_name, context)
|
||||
schema = cls.get_schema(prompt_name)
|
||||
|
||||
return prompt, schema
|
||||
|
||||
@classmethod
|
||||
def list_prompts(cls) -> Dict[str, Dict[str, Any]]:
|
||||
"""
|
||||
List all available prompts with their info
|
||||
|
||||
Returns:
|
||||
Dictionary mapping prompt names to their info (variables, has_schema)
|
||||
"""
|
||||
instance = cls()
|
||||
|
||||
result = {}
|
||||
for prompt_name in instance.prompts:
|
||||
result[prompt_name] = {
|
||||
'variables': instance.prompt_variables[prompt_name],
|
||||
'has_schema': prompt_name in instance.schemas,
|
||||
'variable_count': len(instance.prompt_variables[prompt_name])
|
||||
}
|
||||
|
||||
return result
|
||||
|
||||
@classmethod
|
||||
def reload_prompts(cls):
|
||||
"""Reload all prompt templates and schemas (useful for development)"""
|
||||
if cls._instance:
|
||||
cls._instance._load_all_prompts()
|
||||
logger.info("Prompts and schemas reloaded")
|
||||
|
||||
@classmethod
|
||||
def get_prompt_info(cls, prompt_name: str) -> Dict[str, Any]:
|
||||
"""
|
||||
Get detailed information about a specific prompt
|
||||
|
||||
Returns:
|
||||
Dictionary with prompt template, schema, and required variables
|
||||
"""
|
||||
instance = cls()
|
||||
|
||||
if prompt_name not in instance.prompts:
|
||||
raise ValueError(f"Prompt '{prompt_name}' not found")
|
||||
|
||||
info = {
|
||||
'name': prompt_name,
|
||||
'template': instance.prompts[prompt_name],
|
||||
'variables': instance.prompt_variables[prompt_name],
|
||||
'variable_count': len(instance.prompt_variables[prompt_name]),
|
||||
'has_schema': prompt_name in instance.schemas
|
||||
}
|
||||
|
||||
if prompt_name in instance.schemas:
|
||||
info['schema'] = instance.schemas[prompt_name]
|
||||
|
||||
return info
|
||||
Reference in New Issue
Block a user