gptbot.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480
  1. import os
  2. import inspect
  3. import logging
  4. import signal
  5. import openai
  6. import asyncio
  7. import markdown2
  8. import tiktoken
  9. import duckdb
  10. from nio import AsyncClient, RoomMessageText, MatrixRoom, Event, InviteEvent
  11. from nio.api import MessageDirection
  12. from nio.responses import RoomMessagesError, SyncResponse, RoomRedactError
  13. from configparser import ConfigParser
  14. from datetime import datetime
  15. from argparse import ArgumentParser
  16. # Globals
  17. DATABASE = False
  18. DEFAULT_ROOM_NAME = "GPTBot"
  19. SYSTEM_MESSAGE = "You are a helpful assistant. "
  20. MAX_TOKENS = 3000
  21. MAX_MESSAGES = 20
  22. DEFAULT_MODEL = "gpt-3.5-turbo"
  23. # Set up Matrix client
  24. MATRIX_CLIENT = None
  25. SYNC_TOKEN = None
  26. def logging(message, log_level="info"):
  27. caller = inspect.currentframe().f_back.f_code.co_name
  28. timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S:%f")
  29. print(f"[{timestamp}] - {caller} - [{log_level.upper()}] {message}")
  30. async def gpt_query(messages, model=DEFAULT_MODEL):
  31. logging(f"Querying GPT with {len(messages)} messages")
  32. try:
  33. response = openai.ChatCompletion.create(
  34. model=model,
  35. messages=messages
  36. )
  37. result_text = response.choices[0].message['content']
  38. tokens_used = response.usage["total_tokens"]
  39. logging(f"Used {tokens_used} tokens")
  40. return result_text, tokens_used
  41. except Exception as e:
  42. logging(f"Error during GPT API call: {e}", "error")
  43. return None, 0
  44. async def fetch_last_n_messages(room_id, n=MAX_MESSAGES):
  45. global SYNC_TOKEN, MATRIX_CLIENT
  46. messages = []
  47. logging(
  48. f"Fetching last {2*n} messages from room {room_id} (starting at {SYNC_TOKEN})...")
  49. response = await MATRIX_CLIENT.room_messages(
  50. room_id=room_id,
  51. start=SYNC_TOKEN,
  52. limit=2*n,
  53. )
  54. if isinstance(response, RoomMessagesError):
  55. logging(
  56. f"Error fetching messages: {response.message} (status code {response.status_code})", "error")
  57. return []
  58. for event in response.chunk:
  59. if len(messages) >= n:
  60. break
  61. if isinstance(event, RoomMessageText):
  62. if not event.body.startswith("!"):
  63. messages.append(event)
  64. logging(f"Found {len(messages)} messages (limit: {n})")
  65. # Reverse the list so that messages are in chronological order
  66. return messages[::-1]
  67. def truncate_messages_to_fit_tokens(messages, max_tokens=MAX_TOKENS, model=DEFAULT_MODEL):
  68. global SYSTEM_MESSAGE
  69. encoding = tiktoken.encoding_for_model(model)
  70. total_tokens = 0
  71. system_message_tokens = len(encoding.encode(SYSTEM_MESSAGE)) + 1
  72. if system_message_tokens > max_tokens:
  73. logging(
  74. f"System message is too long to fit within token limit ({system_message_tokens} tokens) - cannot proceed", "error")
  75. return []
  76. total_tokens += system_message_tokens
  77. total_tokens = len(SYSTEM_MESSAGE) + 1
  78. truncated_messages = []
  79. for message in [messages[0]] + list(reversed(messages[1:])):
  80. content = message["content"]
  81. tokens = len(encoding.encode(content)) + 1
  82. if total_tokens + tokens > max_tokens:
  83. break
  84. total_tokens += tokens
  85. truncated_messages.append(message)
  86. return [truncated_messages[0]] + list(reversed(truncated_messages[1:]))
  87. async def process_query(room: MatrixRoom, event: RoomMessageText):
  88. global MATRIX_CLIENT, DATABASE, SYSTEM_MESSAGE
  89. await MATRIX_CLIENT.room_typing(room.room_id, True)
  90. await MATRIX_CLIENT.room_read_markers(room.room_id, event.event_id)
  91. last_messages = await fetch_last_n_messages(room.room_id, 20)
  92. chat_messages = [{"role": "system", "content": SYSTEM_MESSAGE}]
  93. for message in last_messages:
  94. role = "assistant" if message.sender == MATRIX_CLIENT.user_id else "user"
  95. if not message.event_id == event.event_id:
  96. chat_messages.append({"role": role, "content": message.body})
  97. chat_messages.append({"role": "user", "content": event.body})
  98. # Truncate messages to fit within the token limit
  99. truncated_messages = truncate_messages_to_fit_tokens(
  100. chat_messages, MAX_TOKENS - 1)
  101. response, tokens_used = await gpt_query(truncated_messages)
  102. if response:
  103. logging(f"Sending response to room {room.room_id}...")
  104. # Convert markdown to HTML
  105. markdowner = markdown2.Markdown(extras=["fenced-code-blocks"])
  106. formatted_body = markdowner.convert(response)
  107. message = await MATRIX_CLIENT.room_send(
  108. room.room_id, "m.room.message",
  109. {"msgtype": "m.text", "body": response,
  110. "format": "org.matrix.custom.html", "formatted_body": formatted_body}
  111. )
  112. if DATABASE:
  113. logging("Logging tokens used...")
  114. with DATABASE.cursor() as cursor:
  115. cursor.execute(
  116. "INSERT INTO token_usage (message_id, room_id, tokens, timestamp) VALUES (?, ?, ?, ?)",
  117. (message.event_id, room.room_id, tokens_used, datetime.now()))
  118. DATABASE.commit()
  119. else:
  120. # Send a notice to the room if there was an error
  121. logging("Error during GPT API call - sending notice to room")
  122. await MATRIX_CLIENT.room_send(
  123. room.room_id, "m.room.message", {
  124. "msgtype": "m.notice", "body": "Sorry, I'm having trouble connecting to the GPT API right now. Please try again later."}
  125. )
  126. print("No response from GPT API")
  127. await MATRIX_CLIENT.room_typing(room.room_id, False)
  128. async def command_newroom(room: MatrixRoom, event: RoomMessageText):
  129. room_name = " ".join(event.body.split()[2:]) or DEFAULT_ROOM_NAME
  130. logging("Creating new room...")
  131. new_room = await MATRIX_CLIENT.room_create(name=room_name)
  132. logging(f"Inviting {event.sender} to new room...")
  133. await MATRIX_CLIENT.room_invite(new_room.room_id, event.sender)
  134. await MATRIX_CLIENT.room_put_state(
  135. new_room.room_id, "m.room.power_levels", {"users": {event.sender: 100}})
  136. await MATRIX_CLIENT.room_send(
  137. new_room.room_id, "m.room.message", {"msgtype": "m.text", "body": "Welcome to the new room!"})
  138. async def command_help(room: MatrixRoom, event: RoomMessageText):
  139. await MATRIX_CLIENT.room_send(
  140. room.room_id, "m.room.message", {"msgtype": "m.notice",
  141. "body": """Available commands:
  142. !gptbot help - Show this message
  143. !gptbot newroom <room name> - Create a new room and invite yourself to it
  144. !gptbot stats - Show usage statistics for this room
  145. !gptbot botinfo - Show information about the bot
  146. """}
  147. )
  148. async def command_stats(room: MatrixRoom, event: RoomMessageText):
  149. global DATABASE, MATRIX_CLIENT
  150. logging("Showing stats...")
  151. if not DATABASE:
  152. logging("No database connection - cannot show stats")
  153. return
  154. with DATABASE.cursor() as cursor:
  155. cursor.execute(
  156. "SELECT SUM(tokens) FROM token_usage WHERE room_id = ?", (room.room_id,))
  157. total_tokens = cursor.fetchone()[0] or 0
  158. await MATRIX_CLIENT.room_send(
  159. room.room_id, "m.room.message", {"msgtype": "m.notice",
  160. "body": f"Total tokens used: {total_tokens}"}
  161. )
  162. async def command_unknown(room: MatrixRoom, event: RoomMessageText):
  163. global MATRIX_CLIENT
  164. logging("Unknown command")
  165. await MATRIX_CLIENT.room_send(
  166. room.room_id, "m.room.message", {"msgtype": "m.notice",
  167. "body": "Unknown command - try !gptbot help"}
  168. )
  169. async def command_botinfo(room: MatrixRoom, event: RoomMessageText):
  170. global MATRIX_CLIENT
  171. logging("Showing bot info...")
  172. await MATRIX_CLIENT.room_send(
  173. room.room_id, "m.room.message", {"msgtype": "m.notice",
  174. "body": f"""GPT Info:
  175. Model: {DEFAULT_MODEL}
  176. Maximum context tokens: {MAX_TOKENS}
  177. Maximum context messages: {MAX_MESSAGES}
  178. System message: {SYSTEM_MESSAGE}
  179. Room info:
  180. Bot user ID: {MATRIX_CLIENT.user_id}
  181. Current room ID: {room.room_id}
  182. For usage statistics, run !gptbot stats
  183. """})
  184. COMMANDS = {
  185. "help": command_help,
  186. "newroom": command_newroom,
  187. "stats": command_stats,
  188. "botinfo": command_botinfo
  189. }
  190. async def process_command(room: MatrixRoom, event: RoomMessageText):
  191. global COMMANDS
  192. logging(
  193. f"Received command {event.body} from {event.sender} in room {room.room_id}")
  194. command = event.body.split()[1] if event.body.split()[1:] else None
  195. await COMMANDS.get(command, command_unknown)(room, event)
  196. async def message_callback(room: MatrixRoom, event: RoomMessageText):
  197. global DEFAULT_ROOM_NAME, MATRIX_CLIENT, SYSTEM_MESSAGE, DATABASE, MAX_TOKENS
  198. logging(f"Received message from {event.sender} in room {room.room_id}")
  199. if event.sender == MATRIX_CLIENT.user_id:
  200. logging("Message is from bot itself - ignoring")
  201. elif event.body.startswith("!gptbot"):
  202. await process_command(room, event)
  203. elif event.body.startswith("!"):
  204. logging("Might be a command, but not for this bot - ignoring")
  205. else:
  206. await process_query(room, event)
  207. async def room_invite_callback(room: MatrixRoom, event):
  208. global MATRIX_CLIENT
  209. logging(f"Received invite to room {room.room_id} - joining...")
  210. await MATRIX_CLIENT.join(room.room_id)
  211. await MATRIX_CLIENT.room_send(
  212. room.room_id,
  213. "m.room.message",
  214. {"msgtype": "m.text",
  215. "body": "Hello! I'm a helpful assistant. How can I help you today?"}
  216. )
  217. async def accept_pending_invites():
  218. global MATRIX_CLIENT
  219. logging("Accepting pending invites...")
  220. for room_id in list(MATRIX_CLIENT.invited_rooms.keys()):
  221. logging(f"Joining room {room_id}...")
  222. await MATRIX_CLIENT.join(room_id)
  223. await MATRIX_CLIENT.room_send(
  224. room_id,
  225. "m.room.message",
  226. {"msgtype": "m.text",
  227. "body": "Hello! I'm a helpful assistant. How can I help you today?"}
  228. )
  229. async def sync_cb(response):
  230. global SYNC_TOKEN
  231. logging(
  232. f"Sync response received (next batch: {response.next_batch})", "debug")
  233. SYNC_TOKEN = response.next_batch
  234. async def main():
  235. global MATRIX_CLIENT
  236. if not MATRIX_CLIENT.user_id:
  237. whoami = await MATRIX_CLIENT.whoami()
  238. MATRIX_CLIENT.user_id = whoami.user_id
  239. try:
  240. assert MATRIX_CLIENT.user_id
  241. except AssertionError:
  242. logging(
  243. "Failed to get user ID - check your access token or try setting it manually", "critical")
  244. await MATRIX_CLIENT.close()
  245. return
  246. logging("Starting bot...")
  247. MATRIX_CLIENT.add_response_callback(sync_cb, SyncResponse)
  248. logging("Syncing...")
  249. await MATRIX_CLIENT.sync(timeout=30000)
  250. MATRIX_CLIENT.add_event_callback(message_callback, RoomMessageText)
  251. MATRIX_CLIENT.add_event_callback(room_invite_callback, InviteEvent)
  252. await accept_pending_invites() # Accept pending invites
  253. logging("Bot started")
  254. try:
  255. # Continue syncing events
  256. await MATRIX_CLIENT.sync_forever(timeout=30000)
  257. finally:
  258. logging("Syncing one last time...")
  259. await MATRIX_CLIENT.sync(timeout=30000)
  260. await MATRIX_CLIENT.close() # Properly close the aiohttp client session
  261. logging("Bot stopped")
  262. def initialize_database(path):
  263. global DATABASE
  264. logging("Initializing database...")
  265. DATABASE = duckdb.connect(path)
  266. with DATABASE.cursor() as cursor:
  267. # Get the latest migration ID if the migrations table exists
  268. try:
  269. cursor.execute(
  270. """
  271. SELECT MAX(id) FROM migrations
  272. """
  273. )
  274. latest_migration = int(cursor.fetchone()[0])
  275. except:
  276. latest_migration = 0
  277. # Version 1
  278. if latest_migration < 1:
  279. cursor.execute(
  280. """
  281. CREATE TABLE IF NOT EXISTS token_usage (
  282. message_id TEXT PRIMARY KEY,
  283. room_id TEXT NOT NULL,
  284. tokens INTEGER NOT NULL,
  285. timestamp TIMESTAMP NOT NULL
  286. )
  287. """
  288. )
  289. cursor.execute(
  290. """
  291. CREATE TABLE IF NOT EXISTS migrations (
  292. id INTEGER NOT NULL,
  293. timestamp TIMESTAMP NOT NULL
  294. )
  295. """
  296. )
  297. cursor.execute(
  298. "INSERT INTO migrations (id, timestamp) VALUES (1, ?)",
  299. (datetime.now(),)
  300. )
  301. DATABASE.commit()
  302. if __name__ == "__main__":
  303. # Parse command line arguments
  304. parser = ArgumentParser()
  305. parser.add_argument(
  306. "--config", help="Path to config file (default: config.ini in working directory)", default="config.ini")
  307. args = parser.parse_args()
  308. # Read config file
  309. config = ConfigParser()
  310. config.read(args.config)
  311. # Set up Matrix client
  312. try:
  313. assert "Matrix" in config
  314. assert "Homeserver" in config["Matrix"]
  315. assert "AccessToken" in config["Matrix"]
  316. except:
  317. logging("Matrix config not found or incomplete", "critical")
  318. exit(1)
  319. MATRIX_CLIENT = AsyncClient(config["Matrix"]["Homeserver"])
  320. MATRIX_CLIENT.access_token = config["Matrix"]["AccessToken"]
  321. MATRIX_CLIENT.user_id = config["Matrix"].get("UserID")
  322. # Set up GPT API
  323. try:
  324. assert "OpenAI" in config
  325. assert "APIKey" in config["OpenAI"]
  326. except:
  327. logging("OpenAI config not found or incomplete", "critical")
  328. exit(1)
  329. openai.api_key = config["OpenAI"]["APIKey"]
  330. if "Model" in config["OpenAI"]:
  331. DEFAULT_MODEL = config["OpenAI"]["Model"]
  332. if "MaxTokens" in config["OpenAI"]:
  333. MAX_TOKENS = int(config["OpenAI"]["MaxTokens"])
  334. if "MaxMessages" in config["OpenAI"]:
  335. MAX_MESSAGES = int(config["OpenAI"]["MaxMessages"])
  336. # Set up database
  337. if "Database" in config and config["Database"].get("Path"):
  338. initialize_database(config["Database"]["Path"])
  339. # Start bot loop
  340. try:
  341. asyncio.run(main())
  342. except KeyboardInterrupt:
  343. logging("Received KeyboardInterrupt - exiting...")
  344. except signal.SIGTERM:
  345. logging("Received SIGTERM - exiting...")
  346. finally:
  347. if DATABASE:
  348. DATABASE.close()