123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480 |
- import os
- import inspect
- import logging
- import signal
- import openai
- import asyncio
- import markdown2
- import tiktoken
- import duckdb
- from nio import AsyncClient, RoomMessageText, MatrixRoom, Event, InviteEvent
- from nio.api import MessageDirection
- from nio.responses import RoomMessagesError, SyncResponse, RoomRedactError
- from configparser import ConfigParser
- from datetime import datetime
- from argparse import ArgumentParser
- # Globals
- DATABASE = False
- DEFAULT_ROOM_NAME = "GPTBot"
- SYSTEM_MESSAGE = "You are a helpful assistant. "
- MAX_TOKENS = 3000
- MAX_MESSAGES = 20
- DEFAULT_MODEL = "gpt-3.5-turbo"
- # Set up Matrix client
- MATRIX_CLIENT = None
- SYNC_TOKEN = None
- def logging(message, log_level="info"):
- caller = inspect.currentframe().f_back.f_code.co_name
- timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S:%f")
- print(f"[{timestamp}] - {caller} - [{log_level.upper()}] {message}")
- async def gpt_query(messages, model=DEFAULT_MODEL):
- logging(f"Querying GPT with {len(messages)} messages")
- try:
- response = openai.ChatCompletion.create(
- model=model,
- messages=messages
- )
- result_text = response.choices[0].message['content']
- tokens_used = response.usage["total_tokens"]
- logging(f"Used {tokens_used} tokens")
- return result_text, tokens_used
- except Exception as e:
- logging(f"Error during GPT API call: {e}", "error")
- return None, 0
- async def fetch_last_n_messages(room_id, n=MAX_MESSAGES):
- global SYNC_TOKEN, MATRIX_CLIENT
- messages = []
- logging(
- f"Fetching last {2*n} messages from room {room_id} (starting at {SYNC_TOKEN})...")
- response = await MATRIX_CLIENT.room_messages(
- room_id=room_id,
- start=SYNC_TOKEN,
- limit=2*n,
- )
- if isinstance(response, RoomMessagesError):
- logging(
- f"Error fetching messages: {response.message} (status code {response.status_code})", "error")
- return []
- for event in response.chunk:
- if len(messages) >= n:
- break
- if isinstance(event, RoomMessageText):
- if not event.body.startswith("!"):
- messages.append(event)
- logging(f"Found {len(messages)} messages (limit: {n})")
- # Reverse the list so that messages are in chronological order
- return messages[::-1]
- def truncate_messages_to_fit_tokens(messages, max_tokens=MAX_TOKENS, model=DEFAULT_MODEL):
- global SYSTEM_MESSAGE
- encoding = tiktoken.encoding_for_model(model)
- total_tokens = 0
- system_message_tokens = len(encoding.encode(SYSTEM_MESSAGE)) + 1
- if system_message_tokens > max_tokens:
- logging(
- f"System message is too long to fit within token limit ({system_message_tokens} tokens) - cannot proceed", "error")
- return []
- total_tokens += system_message_tokens
- total_tokens = len(SYSTEM_MESSAGE) + 1
- truncated_messages = []
- for message in [messages[0]] + list(reversed(messages[1:])):
- content = message["content"]
- tokens = len(encoding.encode(content)) + 1
- if total_tokens + tokens > max_tokens:
- break
- total_tokens += tokens
- truncated_messages.append(message)
- return [truncated_messages[0]] + list(reversed(truncated_messages[1:]))
- async def process_query(room: MatrixRoom, event: RoomMessageText):
- global MATRIX_CLIENT, DATABASE, SYSTEM_MESSAGE
- await MATRIX_CLIENT.room_typing(room.room_id, True)
- await MATRIX_CLIENT.room_read_markers(room.room_id, event.event_id)
- last_messages = await fetch_last_n_messages(room.room_id, 20)
- chat_messages = [{"role": "system", "content": SYSTEM_MESSAGE}]
- for message in last_messages:
- role = "assistant" if message.sender == MATRIX_CLIENT.user_id else "user"
- if not message.event_id == event.event_id:
- chat_messages.append({"role": role, "content": message.body})
- chat_messages.append({"role": "user", "content": event.body})
- # Truncate messages to fit within the token limit
- truncated_messages = truncate_messages_to_fit_tokens(
- chat_messages, MAX_TOKENS - 1)
- response, tokens_used = await gpt_query(truncated_messages)
- if response:
- logging(f"Sending response to room {room.room_id}...")
- # Convert markdown to HTML
- markdowner = markdown2.Markdown(extras=["fenced-code-blocks"])
- formatted_body = markdowner.convert(response)
- message = await MATRIX_CLIENT.room_send(
- room.room_id, "m.room.message",
- {"msgtype": "m.text", "body": response,
- "format": "org.matrix.custom.html", "formatted_body": formatted_body}
- )
- if DATABASE:
- logging("Logging tokens used...")
- with DATABASE.cursor() as cursor:
- cursor.execute(
- "INSERT INTO token_usage (message_id, room_id, tokens, timestamp) VALUES (?, ?, ?, ?)",
- (message.event_id, room.room_id, tokens_used, datetime.now()))
- DATABASE.commit()
- else:
- # Send a notice to the room if there was an error
- logging("Error during GPT API call - sending notice to room")
- await MATRIX_CLIENT.room_send(
- room.room_id, "m.room.message", {
- "msgtype": "m.notice", "body": "Sorry, I'm having trouble connecting to the GPT API right now. Please try again later."}
- )
- print("No response from GPT API")
- await MATRIX_CLIENT.room_typing(room.room_id, False)
- async def command_newroom(room: MatrixRoom, event: RoomMessageText):
- room_name = " ".join(event.body.split()[2:]) or DEFAULT_ROOM_NAME
- logging("Creating new room...")
- new_room = await MATRIX_CLIENT.room_create(name=room_name)
- logging(f"Inviting {event.sender} to new room...")
- await MATRIX_CLIENT.room_invite(new_room.room_id, event.sender)
- await MATRIX_CLIENT.room_put_state(
- new_room.room_id, "m.room.power_levels", {"users": {event.sender: 100}})
- await MATRIX_CLIENT.room_send(
- new_room.room_id, "m.room.message", {"msgtype": "m.text", "body": "Welcome to the new room!"})
- async def command_help(room: MatrixRoom, event: RoomMessageText):
- await MATRIX_CLIENT.room_send(
- room.room_id, "m.room.message", {"msgtype": "m.notice",
- "body": """Available commands:
- !gptbot help - Show this message
- !gptbot newroom <room name> - Create a new room and invite yourself to it
- !gptbot stats - Show usage statistics for this room
- !gptbot botinfo - Show information about the bot
- """}
- )
- async def command_stats(room: MatrixRoom, event: RoomMessageText):
- global DATABASE, MATRIX_CLIENT
- logging("Showing stats...")
- if not DATABASE:
- logging("No database connection - cannot show stats")
- return
- with DATABASE.cursor() as cursor:
- cursor.execute(
- "SELECT SUM(tokens) FROM token_usage WHERE room_id = ?", (room.room_id,))
- total_tokens = cursor.fetchone()[0] or 0
- await MATRIX_CLIENT.room_send(
- room.room_id, "m.room.message", {"msgtype": "m.notice",
- "body": f"Total tokens used: {total_tokens}"}
- )
- async def command_unknown(room: MatrixRoom, event: RoomMessageText):
- global MATRIX_CLIENT
- logging("Unknown command")
- await MATRIX_CLIENT.room_send(
- room.room_id, "m.room.message", {"msgtype": "m.notice",
- "body": "Unknown command - try !gptbot help"}
- )
- async def command_botinfo(room: MatrixRoom, event: RoomMessageText):
- global MATRIX_CLIENT
- logging("Showing bot info...")
- await MATRIX_CLIENT.room_send(
- room.room_id, "m.room.message", {"msgtype": "m.notice",
- "body": f"""GPT Info:
- Model: {DEFAULT_MODEL}
- Maximum context tokens: {MAX_TOKENS}
- Maximum context messages: {MAX_MESSAGES}
- System message: {SYSTEM_MESSAGE}
- Room info:
- Bot user ID: {MATRIX_CLIENT.user_id}
- Current room ID: {room.room_id}
- For usage statistics, run !gptbot stats
- """})
- COMMANDS = {
- "help": command_help,
- "newroom": command_newroom,
- "stats": command_stats,
- "botinfo": command_botinfo
- }
- async def process_command(room: MatrixRoom, event: RoomMessageText):
- global COMMANDS
- logging(
- f"Received command {event.body} from {event.sender} in room {room.room_id}")
- command = event.body.split()[1] if event.body.split()[1:] else None
- await COMMANDS.get(command, command_unknown)(room, event)
- async def message_callback(room: MatrixRoom, event: RoomMessageText):
- global DEFAULT_ROOM_NAME, MATRIX_CLIENT, SYSTEM_MESSAGE, DATABASE, MAX_TOKENS
- logging(f"Received message from {event.sender} in room {room.room_id}")
- if event.sender == MATRIX_CLIENT.user_id:
- logging("Message is from bot itself - ignoring")
- elif event.body.startswith("!gptbot"):
- await process_command(room, event)
- elif event.body.startswith("!"):
- logging("Might be a command, but not for this bot - ignoring")
- else:
- await process_query(room, event)
- async def room_invite_callback(room: MatrixRoom, event):
- global MATRIX_CLIENT
- logging(f"Received invite to room {room.room_id} - joining...")
- await MATRIX_CLIENT.join(room.room_id)
- await MATRIX_CLIENT.room_send(
- room.room_id,
- "m.room.message",
- {"msgtype": "m.text",
- "body": "Hello! I'm a helpful assistant. How can I help you today?"}
- )
- async def accept_pending_invites():
- global MATRIX_CLIENT
- logging("Accepting pending invites...")
- for room_id in list(MATRIX_CLIENT.invited_rooms.keys()):
- logging(f"Joining room {room_id}...")
- await MATRIX_CLIENT.join(room_id)
- await MATRIX_CLIENT.room_send(
- room_id,
- "m.room.message",
- {"msgtype": "m.text",
- "body": "Hello! I'm a helpful assistant. How can I help you today?"}
- )
- async def sync_cb(response):
- global SYNC_TOKEN
- logging(
- f"Sync response received (next batch: {response.next_batch})", "debug")
- SYNC_TOKEN = response.next_batch
- async def main():
- global MATRIX_CLIENT
- if not MATRIX_CLIENT.user_id:
- whoami = await MATRIX_CLIENT.whoami()
- MATRIX_CLIENT.user_id = whoami.user_id
- try:
- assert MATRIX_CLIENT.user_id
- except AssertionError:
- logging(
- "Failed to get user ID - check your access token or try setting it manually", "critical")
- await MATRIX_CLIENT.close()
- return
- logging("Starting bot...")
- MATRIX_CLIENT.add_response_callback(sync_cb, SyncResponse)
- logging("Syncing...")
- await MATRIX_CLIENT.sync(timeout=30000)
- MATRIX_CLIENT.add_event_callback(message_callback, RoomMessageText)
- MATRIX_CLIENT.add_event_callback(room_invite_callback, InviteEvent)
- await accept_pending_invites() # Accept pending invites
- logging("Bot started")
- try:
- # Continue syncing events
- await MATRIX_CLIENT.sync_forever(timeout=30000)
- finally:
- logging("Syncing one last time...")
- await MATRIX_CLIENT.sync(timeout=30000)
- await MATRIX_CLIENT.close() # Properly close the aiohttp client session
- logging("Bot stopped")
- def initialize_database(path):
- global DATABASE
- logging("Initializing database...")
- DATABASE = duckdb.connect(path)
- with DATABASE.cursor() as cursor:
- # Get the latest migration ID if the migrations table exists
- try:
- cursor.execute(
- """
- SELECT MAX(id) FROM migrations
- """
- )
- latest_migration = int(cursor.fetchone()[0])
- except:
- latest_migration = 0
- # Version 1
- if latest_migration < 1:
- cursor.execute(
- """
- CREATE TABLE IF NOT EXISTS token_usage (
- message_id TEXT PRIMARY KEY,
- room_id TEXT NOT NULL,
- tokens INTEGER NOT NULL,
- timestamp TIMESTAMP NOT NULL
- )
- """
- )
- cursor.execute(
- """
- CREATE TABLE IF NOT EXISTS migrations (
- id INTEGER NOT NULL,
- timestamp TIMESTAMP NOT NULL
- )
- """
- )
- cursor.execute(
- "INSERT INTO migrations (id, timestamp) VALUES (1, ?)",
- (datetime.now(),)
- )
- DATABASE.commit()
- if __name__ == "__main__":
- # Parse command line arguments
- parser = ArgumentParser()
- parser.add_argument(
- "--config", help="Path to config file (default: config.ini in working directory)", default="config.ini")
- args = parser.parse_args()
- # Read config file
- config = ConfigParser()
- config.read(args.config)
- # Set up Matrix client
- try:
- assert "Matrix" in config
- assert "Homeserver" in config["Matrix"]
- assert "AccessToken" in config["Matrix"]
- except:
- logging("Matrix config not found or incomplete", "critical")
- exit(1)
- MATRIX_CLIENT = AsyncClient(config["Matrix"]["Homeserver"])
- MATRIX_CLIENT.access_token = config["Matrix"]["AccessToken"]
- MATRIX_CLIENT.user_id = config["Matrix"].get("UserID")
- # Set up GPT API
- try:
- assert "OpenAI" in config
- assert "APIKey" in config["OpenAI"]
- except:
- logging("OpenAI config not found or incomplete", "critical")
- exit(1)
- openai.api_key = config["OpenAI"]["APIKey"]
- if "Model" in config["OpenAI"]:
- DEFAULT_MODEL = config["OpenAI"]["Model"]
- if "MaxTokens" in config["OpenAI"]:
- MAX_TOKENS = int(config["OpenAI"]["MaxTokens"])
- if "MaxMessages" in config["OpenAI"]:
- MAX_MESSAGES = int(config["OpenAI"]["MaxMessages"])
- # Set up database
- if "Database" in config and config["Database"].get("Path"):
- initialize_database(config["Database"]["Path"])
- # Start bot loop
- try:
- asyncio.run(main())
- except KeyboardInterrupt:
- logging("Received KeyboardInterrupt - exiting...")
- except signal.SIGTERM:
- logging("Received SIGTERM - exiting...")
- finally:
- if DATABASE:
- DATABASE.close()
|