store.py 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730
  1. import duckdb
  2. from nio.store.database import MatrixStore, DeviceTrustState, OlmDevice, TrustState, InboundGroupSession, SessionStore, OlmSessions, GroupSessionStore, OutgoingKeyRequest, DeviceStore
  3. from nio.crypto import OlmAccount, OlmDevice
  4. from random import SystemRandom
  5. from collections import defaultdict
  6. from typing import Dict, List, Optional, Tuple
  7. import json
  8. class DuckDBStore(MatrixStore):
  9. @property
  10. def account_id(self):
  11. id = self._get_account()[0] if self._get_account() else None
  12. if id is None:
  13. id = SystemRandom().randint(0, 2**16)
  14. return id
  15. def __init__(self, user_id, device_id, duckdb_conn):
  16. self.conn = duckdb_conn
  17. self.user_id = user_id
  18. self.device_id = device_id
  19. self._create_tables()
  20. def _create_tables(self):
  21. with self.conn.cursor() as cursor:
  22. cursor.execute("""
  23. DROP TABLE IF EXISTS sync_tokens CASCADE;
  24. DROP TABLE IF EXISTS encrypted_rooms CASCADE;
  25. DROP TABLE IF EXISTS outgoing_key_requests CASCADE;
  26. DROP TABLE IF EXISTS forwarded_chains CASCADE;
  27. DROP TABLE IF EXISTS outbound_group_sessions CASCADE;
  28. DROP TABLE IF EXISTS inbound_group_sessions CASCADE;
  29. DROP TABLE IF EXISTS olm_sessions CASCADE;
  30. DROP TABLE IF EXISTS device_trust_state CASCADE;
  31. DROP TABLE IF EXISTS keys CASCADE;
  32. DROP TABLE IF EXISTS device_keys_key CASCADE;
  33. DROP TABLE IF EXISTS device_keys CASCADE;
  34. DROP TABLE IF EXISTS accounts CASCADE;
  35. """)
  36. # Create accounts table
  37. cursor.execute("""
  38. CREATE TABLE IF NOT EXISTS accounts (
  39. id INTEGER PRIMARY KEY,
  40. user_id VARCHAR NOT NULL,
  41. device_id VARCHAR NOT NULL,
  42. shared_account INTEGER NOT NULL,
  43. pickle VARCHAR NOT NULL
  44. );
  45. """)
  46. # Create device_keys table
  47. cursor.execute("""
  48. CREATE TABLE IF NOT EXISTS device_keys (
  49. device_id TEXT PRIMARY KEY,
  50. account_id INTEGER NOT NULL,
  51. user_id TEXT NOT NULL,
  52. display_name TEXT,
  53. deleted BOOLEAN NOT NULL DEFAULT 0,
  54. UNIQUE (account_id, user_id, device_id),
  55. FOREIGN KEY (account_id) REFERENCES accounts(id) ON DELETE CASCADE
  56. );
  57. CREATE TABLE IF NOT EXISTS keys (
  58. key_type TEXT NOT NULL,
  59. key TEXT NOT NULL,
  60. device_id VARCHAR NOT NULL,
  61. UNIQUE (key_type, device_id),
  62. FOREIGN KEY (device_id) REFERENCES device_keys(device_id) ON DELETE CASCADE
  63. );
  64. """)
  65. # Create device_trust_state table
  66. cursor.execute("""
  67. CREATE TABLE IF NOT EXISTS device_trust_state (
  68. device_id VARCHAR PRIMARY KEY,
  69. state INTEGER NOT NULL,
  70. FOREIGN KEY(device_id) REFERENCES device_keys(device_id) ON DELETE CASCADE
  71. );
  72. """)
  73. # Create olm_sessions table
  74. cursor.execute("""
  75. CREATE SEQUENCE IF NOT EXISTS olm_sessions_id_seq START 1;
  76. CREATE TABLE IF NOT EXISTS olm_sessions (
  77. id INTEGER PRIMARY KEY DEFAULT nextval('olm_sessions_id_seq'),
  78. account_id INTEGER NOT NULL,
  79. sender_key TEXT NOT NULL,
  80. session BLOB NOT NULL,
  81. session_id VARCHAR NOT NULL,
  82. creation_time TIMESTAMP NOT NULL,
  83. last_usage_date TIMESTAMP NOT NULL,
  84. FOREIGN KEY (account_id) REFERENCES accounts (id) ON DELETE CASCADE
  85. );
  86. """)
  87. # Create inbound_group_sessions table
  88. cursor.execute("""
  89. CREATE SEQUENCE IF NOT EXISTS inbound_group_sessions_id_seq START 1;
  90. CREATE TABLE IF NOT EXISTS inbound_group_sessions (
  91. id INTEGER PRIMARY KEY DEFAULT nextval('inbound_group_sessions_id_seq'),
  92. account_id INTEGER NOT NULL,
  93. session TEXT NOT NULL,
  94. fp_key TEXT NOT NULL,
  95. sender_key TEXT NOT NULL,
  96. room_id TEXT NOT NULL,
  97. UNIQUE (account_id, sender_key, fp_key, room_id),
  98. FOREIGN KEY (account_id) REFERENCES accounts(id) ON DELETE CASCADE
  99. );
  100. CREATE TABLE IF NOT EXISTS forwarded_chains (
  101. id INTEGER PRIMARY KEY,
  102. session_id INTEGER NOT NULL,
  103. sender_key TEXT NOT NULL,
  104. FOREIGN KEY (session_id) REFERENCES inbound_group_sessions(id) ON DELETE CASCADE
  105. );
  106. """)
  107. # Create outbound_group_sessions table
  108. cursor.execute("""
  109. CREATE TABLE IF NOT EXISTS outbound_group_sessions (
  110. id INTEGER PRIMARY KEY,
  111. account_id INTEGER NOT NULL,
  112. room_id VARCHAR NOT NULL,
  113. session_id VARCHAR NOT NULL UNIQUE,
  114. session BLOB NOT NULL,
  115. FOREIGN KEY(account_id) REFERENCES accounts(id) ON DELETE CASCADE
  116. );
  117. """)
  118. # Create outgoing_key_requests table
  119. cursor.execute("""
  120. CREATE TABLE IF NOT EXISTS outgoing_key_requests (
  121. id INTEGER PRIMARY KEY,
  122. account_id INTEGER NOT NULL,
  123. request_id TEXT NOT NULL,
  124. session_id TEXT NOT NULL,
  125. room_id TEXT NOT NULL,
  126. algorithm TEXT NOT NULL,
  127. FOREIGN KEY (account_id) REFERENCES accounts(id) ON DELETE CASCADE,
  128. UNIQUE (account_id, request_id)
  129. );
  130. """)
  131. # Create encrypted_rooms table
  132. cursor.execute("""
  133. CREATE TABLE IF NOT EXISTS encrypted_rooms (
  134. room_id TEXT NOT NULL,
  135. account_id INTEGER NOT NULL,
  136. PRIMARY KEY (room_id, account_id),
  137. FOREIGN KEY (account_id) REFERENCES accounts(id) ON DELETE CASCADE
  138. );
  139. """)
  140. # Create sync_tokens table
  141. cursor.execute("""
  142. CREATE TABLE IF NOT EXISTS sync_tokens (
  143. account_id INTEGER PRIMARY KEY,
  144. token TEXT NOT NULL,
  145. FOREIGN KEY (account_id) REFERENCES accounts(id) ON DELETE CASCADE
  146. );
  147. """)
  148. def _get_account(self):
  149. cursor = self.conn.cursor()
  150. cursor.execute(
  151. "SELECT * FROM accounts WHERE user_id = ? AND device_id = ?",
  152. (self.user_id, self.device_id),
  153. )
  154. account = cursor.fetchone()
  155. cursor.close()
  156. return account
  157. def _get_device(self, device):
  158. acc = self._get_account()
  159. if not acc:
  160. return None
  161. cursor = self.conn.cursor()
  162. cursor.execute(
  163. "SELECT * FROM device_keys WHERE user_id = ? AND device_id = ? AND account_id = ?",
  164. (device.user_id, device.id, acc[0]),
  165. )
  166. device_entry = cursor.fetchone()
  167. cursor.close()
  168. return device_entry
  169. # Implementing methods with DuckDB equivalents
  170. def verify_device(self, device):
  171. if self.is_device_verified(device):
  172. return False
  173. d = self._get_device(device)
  174. assert d
  175. cursor = self.conn.cursor()
  176. cursor.execute(
  177. "INSERT OR REPLACE INTO device_trust_state (device_id, state) VALUES (?, ?)",
  178. (d[0], TrustState.verified),
  179. )
  180. self.conn.commit()
  181. cursor.close()
  182. device.trust_state = TrustState.verified
  183. return True
  184. def unverify_device(self, device):
  185. if not self.is_device_verified(device):
  186. return False
  187. d = self._get_device(device)
  188. assert d
  189. cursor = self.conn.cursor()
  190. cursor.execute(
  191. "INSERT OR REPLACE INTO device_trust_state (device_id, state) VALUES (?, ?)",
  192. (d[0], TrustState.unset),
  193. )
  194. self.conn.commit()
  195. cursor.close()
  196. device.trust_state = TrustState.unset
  197. return True
  198. def is_device_verified(self, device):
  199. d = self._get_device(device)
  200. if not d:
  201. return False
  202. cursor = self.conn.cursor()
  203. cursor.execute(
  204. "SELECT state FROM device_trust_state WHERE device_id = ?", (d[0],)
  205. )
  206. trust_state = cursor.fetchone()
  207. cursor.close()
  208. if not trust_state:
  209. return False
  210. return trust_state[0] == TrustState.verified
  211. def blacklist_device(self, device):
  212. if self.is_device_blacklisted(device):
  213. return False
  214. d = self._get_device(device)
  215. assert d
  216. cursor = self.conn.cursor()
  217. cursor.execute(
  218. "INSERT OR REPLACE INTO device_trust_state (device_id, state) VALUES (?, ?)",
  219. (d[0], TrustState.blacklisted),
  220. )
  221. self.conn.commit()
  222. cursor.close()
  223. device.trust_state = TrustState.blacklisted
  224. return True
  225. def unblacklist_device(self, device):
  226. if not self.is_device_blacklisted(device):
  227. return False
  228. d = self._get_device(device)
  229. assert d
  230. cursor = self.conn.cursor()
  231. cursor.execute(
  232. "INSERT OR REPLACE INTO device_trust_state (device_id, state) VALUES (?, ?)",
  233. (d[0], TrustState.unset),
  234. )
  235. self.conn.commit()
  236. cursor.close()
  237. device.trust_state = TrustState.unset
  238. return True
  239. def is_device_blacklisted(self, device):
  240. d = self._get_device(device)
  241. if not d:
  242. return False
  243. cursor = self.conn.cursor()
  244. cursor.execute(
  245. "SELECT state FROM device_trust_state WHERE device_id = ?", (d[0],)
  246. )
  247. trust_state = cursor.fetchone()
  248. cursor.close()
  249. if not trust_state:
  250. return False
  251. return trust_state[0] == TrustState.blacklisted
  252. def ignore_device(self, device):
  253. if self.is_device_ignored(device):
  254. return False
  255. d = self._get_device(device)
  256. assert d
  257. cursor = self.conn.cursor()
  258. cursor.execute(
  259. "INSERT OR REPLACE INTO device_trust_state (device_id, state) VALUES (?, ?)",
  260. (d[0], int(TrustState.ignored.value)),
  261. )
  262. self.conn.commit()
  263. cursor.close()
  264. return True
  265. def ignore_devices(self, devices):
  266. for device in devices:
  267. self.ignore_device(device)
  268. def unignore_device(self, device):
  269. if not self.is_device_ignored(device):
  270. return False
  271. d = self._get_device(device)
  272. assert d
  273. cursor = self.conn.cursor()
  274. cursor.execute(
  275. "INSERT OR REPLACE INTO device_trust_state (device_id, state) VALUES (?, ?)",
  276. (d[0], TrustState.unset),
  277. )
  278. self.conn.commit()
  279. cursor.close()
  280. device.trust_state = TrustState.unset
  281. return True
  282. def is_device_ignored(self, device):
  283. d = self._get_device(device)
  284. if not d:
  285. return False
  286. cursor = self.conn.cursor()
  287. cursor.execute(
  288. "SELECT state FROM device_trust_state WHERE device_id = ?", (d[0],)
  289. )
  290. trust_state = cursor.fetchone()
  291. cursor.close()
  292. if not trust_state:
  293. return False
  294. return trust_state[0] == TrustState.ignored
  295. def load_device_keys(self):
  296. """Load all the device keys from the database.
  297. Returns DeviceStore containing the OlmDevices with the device keys.
  298. """
  299. store = DeviceStore()
  300. account = self.account_id
  301. if not account:
  302. return store
  303. with self.conn.cursor() as cur:
  304. cur.execute(
  305. "SELECT * FROM device_keys WHERE account_id = ?",
  306. (account,)
  307. )
  308. device_keys = cur.fetchall()
  309. for d in device_keys:
  310. cur.execute(
  311. "SELECT * FROM keys WHERE device_id = ?",
  312. (d["id"],)
  313. )
  314. keys = cur.fetchall()
  315. key_dict = {k["key_type"]: k["key"] for k in keys}
  316. store.add(
  317. OlmDevice(
  318. d["user_id"],
  319. d["device_id"],
  320. key_dict,
  321. display_name=d["display_name"],
  322. deleted=d["deleted"],
  323. )
  324. )
  325. return store
  326. def save_device_keys(self, device_keys):
  327. """Save the provided device keys to the database."""
  328. account = self.account_id
  329. assert account
  330. rows = []
  331. for user_id, devices_dict in device_keys.items():
  332. for device_id, device in devices_dict.items():
  333. rows.append(
  334. {
  335. "account_id": account,
  336. "user_id": user_id,
  337. "device_id": device_id,
  338. "display_name": device.display_name,
  339. "deleted": device.deleted,
  340. }
  341. )
  342. if not rows:
  343. return
  344. with self.conn.cursor() as cur:
  345. for idx in range(0, len(rows), 100):
  346. data = rows[idx: idx + 100]
  347. cur.executemany(
  348. "INSERT OR IGNORE INTO device_keys (account_id, user_id, device_id, display_name, deleted) VALUES (?, ?, ?, ?, ?)",
  349. [(r["account_id"], r["user_id"], r["device_id"],
  350. r["display_name"], r["deleted"]) for r in data]
  351. )
  352. for user_id, devices_dict in device_keys.items():
  353. for device_id, device in devices_dict.items():
  354. cur.execute(
  355. "UPDATE device_keys SET deleted = ? WHERE device_id = ?",
  356. (device.deleted, device_id)
  357. )
  358. for key_type, key in device.keys.items():
  359. cur.execute("""
  360. INSERT INTO keys (key_type, key, device_id) VALUES (?, ?, ?)
  361. ON CONFLICT (key_type, device_id) DO UPDATE SET key = ?
  362. """,
  363. (key_type, key, device_id, key)
  364. )
  365. self.conn.commit()
  366. def save_group_sessions(self, sessions):
  367. with self.conn.cursor() as cur:
  368. for session in sessions:
  369. cur.execute("""
  370. INSERT OR REPLACE INTO inbound_group_sessions (
  371. session_id, sender_key, signing_key, room_id, pickle, account_id
  372. ) VALUES (?, ?, ?, ?, ?, ?)
  373. """, (
  374. session.id,
  375. session.sender_key,
  376. session.signing_key,
  377. session.room_id,
  378. session.pickle,
  379. self.account_id
  380. ))
  381. self.conn.commit()
  382. def save_olm_sessions(self, sessions):
  383. with self.conn.cursor() as cur:
  384. for session in sessions:
  385. cur.execute("""
  386. INSERT OR REPLACE INTO olm_sessions (
  387. session_id, sender_key, pickle, account_id
  388. ) VALUES (?, ?, ?, ?)
  389. """, (
  390. session.id,
  391. session.sender_key,
  392. session.pickle,
  393. self.account_id
  394. ))
  395. self.conn.commit()
  396. def save_outbound_group_sessions(self, sessions):
  397. with self.conn.cursor() as cur:
  398. for session in sessions:
  399. cur.execute("""
  400. INSERT OR REPLACE INTO outbound_group_sessions (
  401. room_id, session_id, pickle, account_id
  402. ) VALUES (?, ?, ?, ?)
  403. """, (
  404. session.room_id,
  405. session.id,
  406. session.pickle,
  407. self.account_id
  408. ))
  409. self.conn.commit()
  410. def save_account(self, account: OlmAccount):
  411. with self.conn.cursor() as cur:
  412. cur.execute("""
  413. INSERT OR REPLACE INTO accounts (
  414. id, user_id, device_id, shared_account, pickle
  415. ) VALUES (?, ?, ?, ?, ?)
  416. """, (
  417. self.account_id,
  418. self.user_id,
  419. self.device_id,
  420. account.shared,
  421. account.pickle(self.pickle_key),
  422. ))
  423. self.conn.commit()
  424. def load_sessions(self):
  425. session_store = SessionStore()
  426. with self.conn.cursor() as cur:
  427. cur.execute("""
  428. SELECT
  429. os.sender_key, os.session, os.creation_time
  430. FROM
  431. olm_sessions os
  432. INNER JOIN
  433. accounts a ON os.account_id = a.id
  434. WHERE
  435. a.id = ?
  436. """, (self.account_id,))
  437. for row in cur.fetchall():
  438. sender_key, session_pickle, creation_time = row
  439. session = Session.from_pickle(
  440. session_pickle, creation_time, self.pickle_key)
  441. session_store.add(sender_key, session)
  442. return session_store
  443. def load_inbound_group_sessions(self):
  444. # type: () -> GroupSessionStore
  445. """Load all Olm sessions from the database.
  446. Returns:
  447. ``GroupSessionStore`` object, containing all the loaded sessions.
  448. """
  449. store = GroupSessionStore()
  450. account = self.account_id
  451. if not account:
  452. return store
  453. with self.conn.cursor() as cursor:
  454. cursor.execute(
  455. "SELECT * FROM inbound_group_sessions WHERE account_id = ?", (
  456. account,)
  457. )
  458. for row in cursor.fetchall():
  459. session = InboundGroupSession.from_pickle(
  460. row["session"],
  461. row["fp_key"],
  462. row["sender_key"],
  463. row["room_id"],
  464. self.pickle_key,
  465. [
  466. chain["sender_key"]
  467. for chain in cursor.execute(
  468. "SELECT sender_key FROM forwarded_chains WHERE session_id = ?",
  469. (row["id"],),
  470. )
  471. ],
  472. )
  473. store.add(session)
  474. return store
  475. def load_outgoing_key_requests(self):
  476. # type: () -> dict
  477. """Load all outgoing key requests from the database.
  478. Returns:
  479. ``OutgoingKeyRequestStore`` object, containing all the loaded key requests.
  480. """
  481. account = self.account_id
  482. if not account:
  483. return store
  484. with self.conn.cursor() as cur:
  485. cur.execute(
  486. "SELECT * FROM outgoing_key_requests WHERE account_id = ?",
  487. (account,)
  488. )
  489. rows = cur.fetchall()
  490. return {
  491. request.request_id: OutgoingKeyRequest.from_database(request)
  492. for request in rows
  493. }
  494. def load_encrypted_rooms(self):
  495. """Load the set of encrypted rooms for this account.
  496. Returns:
  497. ``Set`` containing room ids of encrypted rooms.
  498. """
  499. account = self.account_id
  500. if not account:
  501. return set()
  502. with self.conn.cursor() as cur:
  503. cur.execute(
  504. "SELECT room_id FROM encrypted_rooms WHERE account_id = ?",
  505. (account,)
  506. )
  507. rows = cur.fetchall()
  508. return {row["room_id"] for row in rows}
  509. def save_sync_token(self, token):
  510. """Save the given token"""
  511. account = self.account_id
  512. assert account
  513. with self.conn.cursor() as cur:
  514. cur.execute(
  515. "INSERT OR REPLACE INTO sync_tokens (account_id, token) VALUES (?, ?)",
  516. (account, token)
  517. )
  518. self.conn.commit()
  519. def save_encrypted_rooms(self, rooms):
  520. """Save the set of room ids for this account."""
  521. account = self.account_id
  522. assert account
  523. data = [(room_id, account) for room_id in rooms]
  524. with self.conn.cursor() as cur:
  525. for idx in range(0, len(data), 400):
  526. rows = data[idx: idx + 400]
  527. cur.executemany(
  528. "INSERT OR IGNORE INTO encrypted_rooms (room_id, account_id) VALUES (?, ?)",
  529. rows
  530. )
  531. self.conn.commit()
  532. def save_session(self, sender_key, session):
  533. """Save the provided Olm session to the database.
  534. Args:
  535. sender_key (str): The curve key that owns the Olm session.
  536. session (Session): The Olm session that will be pickled and
  537. saved in the database.
  538. """
  539. account = self.account_id
  540. assert account
  541. pickled_session = session.pickle(self.pickle_key)
  542. with self.conn.cursor() as cur:
  543. cur.execute(
  544. "INSERT OR REPLACE INTO olm_sessions (account_id, sender_key, session, session_id, creation_time, last_usage_date) VALUES (?, ?, ?, ?, ?, ?)",
  545. (account, sender_key, pickled_session, session.id,
  546. session.creation_time, session.use_time)
  547. )
  548. self.conn.commit()
  549. def save_inbound_group_session(self, session):
  550. """Save the provided Megolm inbound group session to the database.
  551. Args:
  552. session (InboundGroupSession): The session to save.
  553. """
  554. account = self.account_id
  555. assert account
  556. with self.conn.cursor() as cur:
  557. # Insert a new session or update the existing one
  558. query = """
  559. INSERT INTO inbound_group_sessions (account_id, sender_key, fp_key, room_id, session)
  560. VALUES (?, ?, ?, ?, ?)
  561. ON CONFLICT (account_id, sender_key, fp_key, room_id)
  562. DO UPDATE SET session = excluded.session
  563. """
  564. cur.execute(query, (account, session.sender_key,
  565. session.ed25519, session.room_id, session.pickle(self.pickle_key)))
  566. # Delete existing forwarded chains for the session
  567. delete_query = """
  568. DELETE FROM forwarded_chains WHERE session_id = (SELECT id FROM inbound_group_sessions WHERE account_id = ? AND sender_key = ? AND fp_key = ? AND room_id = ?)
  569. """
  570. cur.execute(
  571. delete_query, (account, session.sender_key, session.ed25519, session.room_id))
  572. # Insert new forwarded chains for the session
  573. insert_query = """
  574. INSERT INTO forwarded_chains (session_id, sender_key)
  575. VALUES ((SELECT id FROM inbound_group_sessions WHERE account_id = ? AND sender_key = ? AND fp_key = ? AND room_id = ?), ?)
  576. """
  577. for chain in session.forwarding_chain:
  578. cur.execute(
  579. insert_query, (account, session.sender_key, session.ed25519, session.room_id, chain))
  580. def add_outgoing_key_request(self, key_request):
  581. account_id = self.account_id
  582. with self.conn.cursor() as cursor:
  583. cursor.execute(
  584. """
  585. INSERT INTO outgoing_key_requests (account_id, request_id, session_id, room_id, algorithm)
  586. VALUES (?, ?, ?, ?, ?)
  587. ON CONFLICT (account_id, request_id) DO NOTHING
  588. """,
  589. (
  590. account_id,
  591. key_request.request_id,
  592. key_request.session_id,
  593. key_request.room_id,
  594. key_request.algorithm,
  595. )
  596. )