Files
chloeclaw/skills/agentmail/references/EXAMPLES.md
2026-03-02 20:22:49 +00:00

14 KiB
Raw Blame History

AgentMail Usage Examples

Common patterns and use cases for AgentMail in AI agent workflows.

Basic Agent Email Setup

1. Create Agent Identity

from agentmail import AgentMail
import os

client = AgentMail(api_key=os.getenv("AGENTMAIL_API_KEY"))

# Create inbox for your agent
agent_inbox = client.inboxes.create(
    username="spike-assistant",
    display_name="Spike - AI Assistant",
    client_id="spike-main-inbox"  # Prevents duplicates
)

print(f"Agent email: {agent_inbox.inbox_id}")
# Output: spike-assistant@agentmail.to

2. Send Status Updates

def send_task_completion(task_name, details, recipient):
    client.inboxes.messages.send(
        inbox_id="spike-assistant@agentmail.to",
        to=recipient,
        subject=f"Task Completed: {task_name}",
        text=f"Hello! I've completed the task: {task_name}\n\nDetails:\n{details}\n\nBest regards,\nSpike 🦝",
        html=f"""
        <p>Hello!</p>
        <p>I've completed the task: <strong>{task_name}</strong></p>
        <h3>Details:</h3>
        <p>{details.replace(chr(10), '<br>')}</p>
        <p>Best regards,<br>Spike 🦝</p>
        """
    )

# Usage
send_task_completion(
    "PDF Processing", 
    "Rotated 5 pages, extracted text, and saved output to /tmp/processed.pdf",
    "adam@example.com"
)

Customer Support Automation

Auto-Reply System

def setup_support_auto_reply():
    """Set up webhook to auto-reply to support emails"""
    
    # Create support inbox
    support_inbox = client.inboxes.create(
        username="support",
        display_name="Customer Support",
        client_id="support-inbox"
    )
    
    # Register webhook for auto-replies
    webhook = client.webhooks.create(
        url="https://your-app.com/webhook/support",
        event_types=["message.received"],
        inbox_ids=[support_inbox.inbox_id],
        client_id="support-webhook"
    )
    
    return support_inbox, webhook

def handle_support_message(message):
    """Process incoming support message and send auto-reply"""
    
    subject = message['subject'].lower()
    sender = message['from'][0]['email']
    
    # Determine response based on subject keywords
    if 'billing' in subject or 'payment' in subject:
        response = """
        Thank you for your billing inquiry. 
        
        Our billing team will review your request and respond within 24 hours. 
        For urgent billing issues, please call 1-800-SUPPORT.
        
        Best regards,
        Customer Support Team
        """
    elif 'bug' in subject or 'error' in subject:
        response = """
        Thank you for reporting this issue.
        
        Our technical team has been notified and will investigate. 
        We'll update you within 48 hours with our findings.
        
        If you have additional details, please reply to this email.
        
        Best regards,
        Technical Support
        """
    else:
        response = """
        Thank you for contacting us!
        
        We've received your message and will respond within 24 hours.
        For urgent issues, please call our support line.
        
        Best regards,
        Customer Support Team
        """
    
    # Send auto-reply
    client.inboxes.messages.send(
        inbox_id=message['inbox_id'],
        to=sender,
        subject=f"Re: {message['subject']}",
        text=response
    )
    
    # Log for human follow-up
    print(f"Auto-replied to {sender} about: {message['subject']}")

Document Processing Workflow

Email → Process → Reply

import base64
import tempfile
from pathlib import Path

def process_pdf_attachment(message):
    """Extract attachments, process PDFs, and reply with results"""
    
    processed_files = []
    
    for attachment in message.get('attachments', []):
        if attachment['content_type'] == 'application/pdf':
            # Decode attachment
            pdf_data = base64.b64decode(attachment['content'])
            
            # Save to temp file
            with tempfile.NamedTemporaryFile(suffix='.pdf', delete=False) as tmp:
                tmp.write(pdf_data)
                temp_path = tmp.name
            
            try:
                # Process PDF (example: extract text)
                extracted_text = extract_pdf_text(temp_path)
                
                # Save processed result
                output_path = f"/tmp/processed_{attachment['filename']}.txt"
                with open(output_path, 'w') as f:
                    f.write(extracted_text)
                
                processed_files.append({
                    'original': attachment['filename'],
                    'output': output_path,
                    'preview': extracted_text[:200] + '...'
                })
                
            finally:
                Path(temp_path).unlink()  # Clean up temp file
    
    if processed_files:
        # Send results back
        results_text = "\n".join([
            f"Processed {f['original']}:\n{f['preview']}\n"
            for f in processed_files
        ])
        
        # Attach processed files
        attachments = []
        for f in processed_files:
            with open(f['output'], 'r') as file:
                content = base64.b64encode(file.read().encode()).decode()
            attachments.append({
                'filename': Path(f['output']).name,
                'content': content,
                'content_type': 'text/plain'
            })
        
        client.inboxes.messages.send(
            inbox_id=message['inbox_id'],
            to=message['from'][0]['email'],
            subject=f"Re: {message['subject']} - Processed",
            text=f"I've processed your PDF files:\n\n{results_text}",
            attachments=attachments
        )

def extract_pdf_text(pdf_path):
    """Extract text from PDF file"""
    # Implementation depends on your PDF library
    # Example with pdfplumber:
    import pdfplumber
    text = ""
    with pdfplumber.open(pdf_path) as pdf:
        for page in pdf.pages:
            text += page.extract_text() + "\n"
    return text

Task Assignment and Tracking

Email-Based Task Management

def create_task_tracker_inbox():
    """Set up inbox for task assignments via email"""
    
    inbox = client.inboxes.create(
        username="tasks",
        display_name="Task Assignment Bot",
        client_id="task-tracker"
    )
    
    # Webhook for processing task emails
    webhook = client.webhooks.create(
        url="https://your-app.com/webhook/tasks",
        event_types=["message.received"],
        inbox_ids=[inbox.inbox_id]
    )
    
    return inbox

def process_task_assignment(message):
    """Parse email and create task from content"""
    
    subject = message['subject']
    body = message.get('text', '')
    sender = message['from'][0]['email']
    
    # Simple task parsing
    if subject.startswith('TASK:'):
        task_title = subject[5:].strip()
        
        # Extract due date, priority, etc. from body
        lines = body.split('\n')
        due_date = None
        priority = 'normal'
        description = body
        
        for line in lines:
            if line.startswith('Due:'):
                due_date = line[4:].strip()
            elif line.startswith('Priority:'):
                priority = line[9:].strip().lower()
        
        # Create task in your system
        task_id = create_task_in_system({
            'title': task_title,
            'description': description,
            'due_date': due_date,
            'priority': priority,
            'assigned_by': sender
        })
        
        # Confirm task creation
        client.inboxes.messages.send(
            inbox_id=message['inbox_id'],
            to=sender,
            subject=f"Task Created: {task_title} (#{task_id})",
            text=f"""
Task successfully created!

ID: #{task_id}
Title: {task_title}
Priority: {priority}
Due: {due_date or 'Not specified'}

I'll send updates as work progresses.

Best regards,
Task Bot
            """
        )
        
        # Start processing task...
        process_task_async(task_id)

def create_task_in_system(task_data):
    """Create task in your task management system"""
    # Implementation depends on your system
    # Return task ID
    return "T-12345"

def send_task_update(task_id, status, details, assignee_email):
    """Send task progress update"""
    
    client.inboxes.messages.send(
        inbox_id="tasks@agentmail.to",
        to=assignee_email,
        subject=f"Task Update: #{task_id} - {status}",
        text=f"""
Task #{task_id} Status Update

Status: {status}
Details: {details}

View full details: https://your-app.com/tasks/{task_id}

Best regards,
Task Bot
        """
    )

Integration with External Services

GitHub Issue Creation from Email

def setup_github_integration():
    """Create inbox for GitHub issue creation"""
    
    inbox = client.inboxes.create(
        username="github-issues",
        display_name="GitHub Issue Creator",
        client_id="github-integration"
    )
    
    return inbox

def create_github_issue_from_email(message):
    """Convert email to GitHub issue"""
    
    import requests
    
    # Extract issue details
    title = message['subject'].replace('BUG:', '').replace('FEATURE:', '').strip()
    body_content = message.get('text', '')
    sender = message['from'][0]['email']
    
    # Determine issue type and labels
    labels = ['email-created']
    if 'BUG:' in message['subject']:
        labels.append('bug')
    elif 'FEATURE:' in message['subject']:
        labels.append('enhancement')
    
    # Create GitHub issue
    github_token = os.getenv('GITHUB_TOKEN')
    repo = 'your-org/your-repo'
    
    issue_data = {
        'title': title,
        'body': f"""
**Reported via email by:** {sender}

**Original message:**
{body_content}

**Email Thread:** {message.get('thread_id')}
        """,
        'labels': labels
    }
    
    response = requests.post(
        f'https://api.github.com/repos/{repo}/issues',
        json=issue_data,
        headers={
            'Authorization': f'token {github_token}',
            'Accept': 'application/vnd.github.v3+json'
        }
    )
    
    if response.status_code == 201:
        issue = response.json()
        
        # Reply with GitHub issue link
        client.inboxes.messages.send(
            inbox_id=message['inbox_id'],
            to=sender,
            subject=f"Re: {message['subject']} - GitHub Issue Created",
            text=f"""
Thank you for your report!

I've created a GitHub issue for tracking:

Issue #{issue['number']}: {issue['title']}
Link: {issue['html_url']}

You can track progress and add comments directly on GitHub.

Best regards,
GitHub Bot
            """
        )
        
        print(f"Created GitHub issue #{issue['number']} from email")
    else:
        print(f"Failed to create GitHub issue: {response.text}")

# Usage in webhook handler
def handle_github_webhook(payload):
    if payload['event_type'] == 'message.received':
        message = payload['message']
        if message['inbox_id'] == 'github-issues@agentmail.to':
            create_github_issue_from_email(message)

Notification and Alert System

Multi-Channel Alerts

def setup_alert_system():
    """Create alert inbox for system notifications"""
    
    alerts_inbox = client.inboxes.create(
        username="alerts",
        display_name="System Alerts",
        client_id="alert-system"
    )
    
    return alerts_inbox

def send_system_alert(alert_type, message, severity='info', recipients=None):
    """Send system alert via email"""
    
    if recipients is None:
        recipients = ['admin@company.com', 'ops@company.com']
    
    severity_emoji = {
        'critical': '🚨',
        'warning': '⚠️',
        'info': '',
        'success': '✅'
    }
    
    emoji = severity_emoji.get(severity, '')
    
    client.inboxes.messages.send(
        inbox_id="alerts@agentmail.to",
        to=recipients,
        subject=f"{emoji} [{severity.upper()}] {alert_type}",
        text=f"""
System Alert

Type: {alert_type}
Severity: {severity}
Time: {datetime.now().isoformat()}

Message:
{message}

This is an automated alert from the monitoring system.
        """,
        html=f"""
<h2>{emoji} System Alert</h2>
<table>
<tr><td><strong>Type:</strong></td><td>{alert_type}</td></tr>
<tr><td><strong>Severity:</strong></td><td style="color: {'red' if severity == 'critical' else 'orange' if severity == 'warning' else 'blue'}">{severity}</td></tr>
<tr><td><strong>Time:</strong></td><td>{datetime.now().isoformat()}</td></tr>
</table>

<h3>Message:</h3>
<p>{message.replace(chr(10), '<br>')}</p>

<p><em>This is an automated alert from the monitoring system.</em></p>
        """
    )

# Usage examples
send_system_alert("Database Connection", "Unable to connect to primary database", "critical")
send_system_alert("Backup Complete", "Daily backup completed successfully", "success")
send_system_alert("High CPU Usage", "CPU usage above 80% for 5 minutes", "warning")

Testing and Development

Local Development Setup

def setup_dev_environment():
    """Set up AgentMail for local development"""
    
    # Create development inboxes
    dev_inbox = client.inboxes.create(
        username="dev-test",
        display_name="Development Testing",
        client_id="dev-testing"
    )
    
    print(f"Development inbox: {dev_inbox.inbox_id}")
    print("Use this for testing email workflows locally")
    
    # Test email sending
    test_response = client.inboxes.messages.send(
        inbox_id=dev_inbox.inbox_id,
        to="your-personal-email@gmail.com",
        subject="AgentMail Development Test",
        text="This is a test email from your AgentMail development setup."
    )
    
    print(f"Test email sent: {test_response.message_id}")
    
    return dev_inbox

# Run development setup
if __name__ == "__main__":
    setup_dev_environment()