142 lines
5.7 KiB
Python
142 lines
5.7 KiB
Python
from typing import Tuple
|
|
|
|
from botbuilder.core import ActivityHandler, TurnContext
|
|
from botbuilder.schema import ChannelAccount, Mention, Activity, ActivityTypes
|
|
|
|
from config import DefaultConfig
|
|
|
|
from commands.webhook import WebhookCommand
|
|
from commands.test import TestCommand
|
|
from modules.database import DatabaseManager
|
|
from modules.command import Command
|
|
|
|
import re
|
|
|
|
class WebhookBot(ActivityHandler):
|
|
"""Webhook bot for Microsoft Teams."""
|
|
|
|
def __init__(self, config: DefaultConfig, database: DatabaseManager) -> None:
|
|
super().__init__()
|
|
self.command_prefix = DefaultConfig.COMMAND_PREFIX
|
|
self.webhook_manager = WebhookCommand(database)
|
|
self.database = database
|
|
|
|
def is_command(self, text: str) -> tuple[bool, str]:
|
|
"""Check if the text is a command."""
|
|
if text.startswith(self.command_prefix) and len(text) > len(self.command_prefix):
|
|
return True, text[len(self.command_prefix):].split(" ")[0]
|
|
return False, ""
|
|
|
|
def has_sub_command(self, text: str) -> tuple[bool, str | None]:
|
|
"""Check if the text has a sub command."""
|
|
split_text = text.split(" ")
|
|
if len(split_text) > 1:
|
|
return True, split_text[1]
|
|
return False, None
|
|
|
|
def is_bot_mentioned(self, turn_context: TurnContext) -> bool:
|
|
"""Check if the bot is mentioned in the activity."""
|
|
activity = turn_context.activity
|
|
|
|
# Get bot ID from recipient
|
|
if activity.recipient is None:
|
|
return False
|
|
bot_id = activity.recipient.id
|
|
|
|
# Check entities for mentions
|
|
if activity.entities is None:
|
|
return False
|
|
|
|
for entity in activity.entities:
|
|
if entity.type == "mention":
|
|
try:
|
|
# Check if entity has mentioned property (ChannelAccount)
|
|
if hasattr(entity, 'mentioned') and entity.mentioned:
|
|
mentioned_id = getattr(entity.mentioned, 'id', None)
|
|
if mentioned_id == bot_id:
|
|
return True
|
|
|
|
# Check properties dict (alternative format)
|
|
if hasattr(entity, 'properties') and entity.properties:
|
|
props = entity.properties
|
|
if isinstance(props, dict):
|
|
mentioned = props.get('mentioned', {})
|
|
if isinstance(mentioned, dict):
|
|
if mentioned.get('id') == bot_id: # type: ignore
|
|
return True
|
|
except Exception as e:
|
|
continue
|
|
return False
|
|
|
|
def is_channel_conversation(self, turn_context: TurnContext) -> bool:
|
|
"""Check if this is a channel conversation."""
|
|
activity = turn_context.activity
|
|
if activity.conversation is None:
|
|
return False
|
|
# In Teams, channel conversations have conversation_type == "channel"
|
|
return getattr(activity.conversation, 'conversation_type', None) == "channel"
|
|
|
|
def remove_mentions_from_text(self, text: str, activity) -> str:
|
|
"""Remove mention text from the message text."""
|
|
# Remove all between <at> and </at>
|
|
return re.sub(r'<at>.*?</at>', '', text)
|
|
|
|
def send_message(self, channel_id: str, message: str) -> None:
|
|
channel_account = ChannelAccount(id=channel_id)
|
|
activity = Activity(
|
|
type=ActivityTypes.message,
|
|
text=message,
|
|
channel_id=channel_id,
|
|
from_property=channel_account
|
|
)
|
|
|
|
|
|
async def on_command(self, command: str, turn_context: TurnContext) -> None:
|
|
"""Handle the command."""
|
|
await turn_context.send_activity(f"Command {command} received.")
|
|
return
|
|
|
|
async def on_members_added_activity(self, members_added: list[ChannelAccount], turn_context: TurnContext) -> None:
|
|
"""Handle the members added activity."""
|
|
if turn_context.activity.recipient is None:
|
|
return
|
|
for member in members_added:
|
|
if member.id != turn_context.activity.recipient.id:
|
|
await turn_context.send_activity("Hello and welcome!")
|
|
return
|
|
|
|
async def on_channel_created_activity(self, turn_context: TurnContext) -> None:
|
|
"""Handle the channel created activity."""
|
|
if turn_context.activity.recipient is None:
|
|
return
|
|
await turn_context.send_activity(f"Channel created: {turn_context.activity.channel_id}")
|
|
return
|
|
|
|
async def on_message_activity(self, turn_context: TurnContext) -> None:
|
|
"""Handle the message activity - works for both 1:1 and channel posts."""
|
|
activity = turn_context.activity
|
|
|
|
# Check if this is a channel conversation
|
|
is_channel = self.is_channel_conversation(turn_context)
|
|
|
|
if not is_channel:
|
|
return
|
|
|
|
# Get text from activity - can be None for channel posts with only attachments
|
|
text = activity.text or ""
|
|
|
|
# Remove mentions from text for processing
|
|
text = self.remove_mentions_from_text(text, activity)
|
|
|
|
# Strip whitespace and check if it's a command
|
|
text = text.strip()
|
|
message = text.split(" ")[1:] if len(text.split(" ")) > 1 else None
|
|
message = " ".join(message) if message is not None else None
|
|
is_command, command = self.is_command(text)
|
|
|
|
if is_command:
|
|
cmd = Command(turn_context, self.database)
|
|
await cmd.handle_command(command, message)
|
|
else:
|
|
await turn_context.send_activity(f"Message: {text}")
|
|
return |