subDesTagesMitExtraKaese 2 years ago
parent
commit
5e53382501
3 changed files with 281 additions and 123 deletions
  1. 46 122
      main.py
  2. 233 0
      pihole.py
  3. 2 1
      requirements.txt

+ 46 - 122
main.py

@@ -6,143 +6,67 @@ import logging
 
 from influxdb_client import InfluxDBClient, Point
 from influxdb_client.client.write_api import SYNCHRONOUS
-from pihole import PiHole # PiHole API Wrapper
+from pihole import PiHole
 
 logger = logging.Logger('pihole-to-influxdb')
 logger.addHandler(logging.StreamHandler(sys.stdout))
 
 try:
-    # optional Logger Settings
-    logging.basicConfig(level=os.getenv("LOGLEVEL", "DEBUG"))
+  # optional Logger Settings
+  logging.basicConfig(level=os.getenv("LOGLEVEL", "DEBUG"))
 
-    # InfluxDB Settings
-    DB_URL = os.environ['INFLUX_DB_URL']
-    DB_ORG = os.environ['INFLUX_DB_ORG']
-    DB_TOKEN = os.environ['INFLUX_DB_TOKEN']
-    DB_BUCKET = os.environ['INFLUX_DB_BUCKET']
+  # InfluxDB Settings
+  DB_URL = os.environ['INFLUX_DB_URL']
+  DB_ORG = os.environ['INFLUX_DB_ORG']
+  DB_TOKEN = os.environ['INFLUX_DB_TOKEN']
+  DB_BUCKET = os.environ['INFLUX_DB_BUCKET']
 
-    # PiHole Settings
-    PIHOLE_HOSTNAME = str(os.environ['PIHOLE_HOSTNAME'])
-    TEST_INTERVAL = int(os.environ['PIHOLE_INTERVAL'])
+  # PiHole Settings
+  PIHOLE_HOSTNAME = str(os.environ['PIHOLE_HOSTNAME'])
+  QUERY_LIVE = bool(os.getenv('PIHOLE_QUERY_LIVE', True))
+  QUERY_INTERVAL = int(os.environ['PIHOLE_INTERVAL'])
 
-    # optional Pi-hole authentication
-    AUTHENTICATION_TOKEN = os.getenv('PIHOLE_AUTHENTICATION', None)
+  # optional Pi-hole authentication
+  AUTHENTICATION_TOKEN = os.getenv('PIHOLE_AUTHENTICATION', None)
 
 except KeyError as e:
-    logger.fatal('Missing environment variable: {}'.format(e))
-    sys.exit(1)
+  logger.fatal('Missing environment variable: {}'.format(e))
+  sys.exit(1)
 
 influxdb_client = InfluxDBClient(DB_URL, DB_TOKEN, org=DB_ORG)
-
-def get_data_for_influxdb(pihole: PiHole, timestamp: datetime.datetime):
-    return [
-        Point("domains") \
-            .time(timestamp) \
-            .tag("hostname", PIHOLE_HOSTNAME) \
-            .field("domain_count", int(pihole.domain_count.replace(',',''))) \
-            .field("unique_domains", int(pihole.unique_domains.replace(',',''))) \
-            .field("forwarded", int(pihole.forwarded.replace(',',''))) \
-            .field("cached", int(pihole.cached.replace(',',''))),
-
-        Point("queries") \
-            .time(timestamp) \
-            .tag("hostname", PIHOLE_HOSTNAME) \
-            .field("queries", int(pihole.queries.replace(',',''))) \
-            .field("blocked", int(pihole.blocked.replace(',',''))) \
-            .field("ads_percentage", float(pihole.ads_percentage)),
-
-        Point("clients") \
-            .time(timestamp) \
-            .tag("hostname", PIHOLE_HOSTNAME) \
-            .field("total_clients", int(pihole.total_clients.replace(',',''))) \
-            .field("unique_clients", int(pihole.unique_clients.replace(',',''))) \
-            .field("total_queries", int(pihole.total_queries.replace(',',''))),
-
-        Point("other") \
-            .time(timestamp) \
-            .tag("hostname", PIHOLE_HOSTNAME) \
-            .field("status", pihole.status == 'enabled') \
-            .field("gravity_last_update", pihole.gravity_last_updated['absolute'])
-    ]
-
-def get_authenticated_data_for_influxdb(pihole: PiHole, timestamp: datetime.datetime):
-    query_type_point = Point("query_types") \
-        .time(timestamp) \
-        .tag("hostname", PIHOLE_HOSTNAME)
-    
-    for key, value in pihole.query_types.items():
-        query_type_point.field(key, float(value))
-
-    forward_destinations_point = Point("forward_destinations") \
-        .time(timestamp) \
-        .tag("hostname", PIHOLE_HOSTNAME)
-    
-    for key, value in pihole.forward_destinations['forward_destinations'].items():
-        forward_destinations_point.field(key.split('|')[0], value)
-
-    return [
-        query_type_point,
-        forward_destinations_point
-    ]
+pihole = PiHole(PIHOLE_HOSTNAME, AUTHENTICATION_TOKEN)
 
 class Auth(object):
-    def __init__(self, token):
-        # PiHole's web token is just a double sha256 hash of the utf8 encoded password
-        self.token = token
-        self.auth_timestamp = time.time()
+  def __init__(self, token):
+    # PiHole's web token is just a double sha256 hash of the utf8 encoded password
+    self.token = token
+    self.auth_timestamp = time.time()
 
 def main():
-    # pihole ctor has side effects, so we create it locally
-    pihole = PiHole(PIHOLE_HOSTNAME)
-
-    write_api = influxdb_client.write_api(write_options=SYNCHRONOUS)
-
-    USE_AUTHENTICATION = False if AUTHENTICATION_TOKEN == None else True
-
-    if USE_AUTHENTICATION:
-        try:
-            pihole.auth_data = Auth(AUTHENTICATION_TOKEN)
-            pihole.refresh()
-            logger.info('Pi-Hole authentication successful')
-        except Exception as e:
-            logger.exception('Pi-Hole authentication failed')
-            USE_AUTHENTICATION = False
-            raise
-    
-    next_update = time.monotonic()
-
-    while True:
-        try:
-
-            pihole.refresh()
-            timestamp = datetime.datetime.now()
-            data = get_data_for_influxdb(pihole, timestamp)
-
-            if USE_AUTHENTICATION:
-                authenticated_data = get_authenticated_data_for_influxdb(pihole, timestamp)
-                try:
-                    write_api.write(bucket=DB_BUCKET, record=authenticated_data)
-                except Exception as e:
-                    logger.exception('Failed to write authenticated data to InfluxDB')
-
-            try:
-                write_api.write(bucket=DB_BUCKET, record=data)
-                logger.debug('Wrote data to InfluxDB')
-                sleep_duration = TEST_INTERVAL
-            except Exception as e:
-                logger.exception('Failed to write data to InfluxDB')
-                # Sleep at most two minutes
-                sleep_duration = min(TEST_INTERVAL, 120)
-
-        except Exception as e:
-            logger.exception('Failed to get data from Pi-Hole')
-            # Sleep at most two minutes
-            sleep_duration = min(TEST_INTERVAL, 120)
-
-        next_update = next_update + sleep_duration
-        logger.debug("Now sleeping for {}".format(datetime.timedelta(seconds=sleep_duration)))
-        time.sleep(max(0, next_update - time.monotonic()))
+  write_api = influxdb_client.write_api(write_options=SYNCHRONOUS)
+  next_update = time.monotonic()
+
+  while True:
+    try:
+      timestamp = datetime.datetime.now()
+      if QUERY_LIVE:
+        data = list(pihole.get_queries_for_influxdb(timestamp, QUERY_INTERVAL))
+      else:
+        data = list(pihole.get_totals_for_influxdb())
+
+      logger.debug('Writing {} points to InfluxDB'.format(len(data)))
+      write_api.write(bucket=DB_BUCKET, record=data)
+      sleep_duration = QUERY_INTERVAL
+
+    except Exception as e:
+      logger.exception('Failed to get data from Pi-Hole to InfluxDB')
+      # Sleep at most two minutes
+      sleep_duration = min(QUERY_INTERVAL, 120)
+
+    next_update = next_update + sleep_duration
+    logger.debug("Now sleeping for {}".format(datetime.timedelta(seconds=sleep_duration)))
+    time.sleep(max(0, next_update - time.monotonic()))
 
 if __name__ == '__main__':
-    logger.info('PiHole Data Logger to InfluxDB')
-    main()
+  logger.info('PiHole Data Logger to InfluxDB')
+  main()

+ 233 - 0
pihole.py

@@ -0,0 +1,233 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+
+import requests
+from datetime import datetime, timedelta
+from enum import Enum
+from influxdb_client import Point
+from pandas import DataFrame
+
+class QueryStati(Enum):
+  Blocked = 1
+  Forwarded = 2
+  Cached = 3
+  Wildcard = 4
+  Unknown = 5
+
+class PiHole:
+  def __init__(self, host, token):
+    self.host = host
+    self.token = token
+
+  def query(self, endpoint, params={}):
+    url = f"http://{self.host}/admin/{endpoint}.php"
+    return requests.get(url, params=params)
+  
+  def request_all_queries(self, start: float, end: float):
+    """
+    keys[]: time, query_type, domain, client, status, destination, reply_type, reply_time, dnssec
+    """
+    if not self.token:
+      raise Exception("Token required")
+    params = {
+      "getAllQueries": "",
+      "from": int(start),
+      "until": int(end),
+      "auth": self.token
+      }
+    json = self.query("api_db", params=params).json()
+    if json:
+      return json['data']
+    else:
+      return []
+
+  def request_summary(self):
+    """
+    keys: 
+      - domains_being_blocked
+      - dns_queries_today
+      - ads_blocked_today
+      - ads_percentage_today
+      - unique_domains
+      - queries_forwarded
+      - queries_cached
+      - clients_ever_seen
+      - unique_clients
+      - dns_queries_all_types
+      - reply_UNKNOWN
+      - reply_NODATA
+      - reply_NXDOMAIN
+      - reply_CNAME
+      - reply_IP
+      - reply_DOMAIN
+      - reply_RRNAME
+      - reply_SERVFAIL
+      - reply_REFUSED
+      - reply_NOTIMP
+      - reply_OTHER
+      - reply_DNSSEC
+      - reply_NONE
+      - reply_BLOB
+      - dns_queries_all_replies
+      - privacy_level
+      - status
+      - gravity_last_update: file_exists, absolute, relative
+    """
+    json = self.query("api").json()
+    return json
+  
+  def request_forward_destinations(self):
+    if not self.token:
+      raise Exception("Token required")
+    params = {
+      "getForwardDestinations": "",
+      "auth": self.token
+      }
+    json = self.query("api", params=params).json()
+    if json:
+      return json['forward_destinations']
+    else:
+      return {}
+
+  def request_query_types(self):
+    if not self.token:
+      raise Exception("Token required")
+    params = {
+      "getQueryTypes": "",
+      "auth": self.token
+      }
+    json = self.query("api", params=params).json()
+    if json:
+      return json['querytypes']
+    else:
+      return {}
+
+  def get_totals_for_influxdb(self):
+    summary = self.request_summary()
+    timestamp = datetime.now().astimezone()
+    yield Point("domains") \
+      .time(timestamp) \
+      .tag("hostname", self.host) \
+      .field("domain_count", summary['domains_being_blocked']) \
+      .field("unique_domains", summary['unique_domains']) \
+      .field("forwarded", summary['queries_forwarded']) \
+      .field("cached", summary['queries_cached'])
+      
+    yield Point("queries") \
+      .time(timestamp) \
+      .tag("hostname", self.host) \
+      .field("queries", summary['dns_queries_today']) \
+      .field("blocked", summary['ads_blocked_today']) \
+      .field("ads_percentage", summary['ads_percentage_today'])
+      
+    yield Point("clients") \
+      .time(timestamp) \
+      .tag("hostname", self.host) \
+      .field("total_clients", summary['clients_ever_seen']) \
+      .field("unique_clients", summary['unique_clients']) \
+      .field("total_queries", summary['dns_queries_all_types'])
+      
+    yield Point("other") \
+      .time(timestamp) \
+      .tag("hostname", self.host) \
+      .field("status", summary['status'] == 'enabled') \
+      .field("gravity_last_update", summary['gravity_last_updated']['absolute'])
+
+    if self.token:
+      query_types = self.request_query_types()
+      query_type_point = Point("query_types").time(timestamp).tag("hostname", self.host)
+      for key, value in query_types.items():
+        query_type_point.field(key, float(value))
+      yield query_type_point
+
+      forward_destinations = self.request_forward_destinations()
+      forward_destinations_point = Point("forward_destinations").time(timestamp).tag("hostname", self.host)
+      for key, value in forward_destinations.items():
+        forward_destinations_point.field(key.split('|')[0], value)
+      yield forward_destinations_point
+  
+  def get_queries_for_influxdb(self, query_date: datetime, sample_period: int):
+    # Get all queries since last sample
+    end_time = query_date.timestamp()
+    start_time = end_time - sample_period + 1
+    queries = self.request_all_queries(start_time, end_time)
+    timestamp = datetime.now().astimezone()
+    df = DataFrame(queries, columns=['time', 'query_type', 'domain', 'client', 'status', 'destination', 'reply_type', 'reply_time', 'dnssec'])
+
+    # we still need some stats from the summary
+    summary = self.request_summary()
+
+    yield Point("domains") \
+      .time(timestamp) \
+      .tag("hostname", self.host) \
+      .field("domain_count", summary['domains_being_blocked']) \
+      .field("unique_domains", len(df.groupby('domain'))) \
+      .field("forwarded", len(df[df['status'] == QueryStati.Forwarded.value])) \
+      .field("cached", len(df[df['status'] == QueryStati.Cached.value]))
+    
+    blocked_count = len(df[(df['status'] == QueryStati.Blocked.value) | (df['status'] == QueryStati.Wildcard.value)])
+    queries_point = Point("queries") \
+      .time(timestamp) \
+      .tag("hostname", self.host) \
+      .field("queries", len(df)) \
+      .field("blocked", blocked_count) \
+      .field("ads_percentage", blocked_count * 100.0 / max(1, len(df)))
+    yield queries_point
+
+    for key, client_df in df.groupby('client'):
+      blocked_count = len(client_df[(client_df['status'] == QueryStati.Blocked.value) | (client_df['status'] == QueryStati.Wildcard.value)])
+      clients_point = Point("clients") \
+        .time(timestamp) \
+        .tag("hostname", self.host) \
+        .tag("client", key) \
+        .field("queries", len(client_df)) \
+        .field("blocked", blocked_count) \
+        .field("ads_percentage", blocked_count * 100.0 / max(1, len(client_df)))
+      yield clients_point
+
+    yield Point("other") \
+      .time(timestamp) \
+      .tag("hostname", self.host) \
+      .field("status", summary['status'] == 'enabled') \
+      .field("gravity_last_update", summary['gravity_last_updated']['absolute'])
+
+    query_type_point = Point("query_types").time(timestamp).tag("hostname", self.host)
+    for key, group_df in df.groupby('query_type'):
+      query_type_point.field(key, len(group_df))
+    yield query_type_point
+
+    forward_destinations_point = Point("forward_destinations").time(timestamp).tag("hostname", self.host)
+    for key, group_df in df.groupby('destination'):
+      forward_destinations_point.field(key.split('|')[0], len(group_df))
+    yield forward_destinations_point
+
+  def get_logs_for_influxdb(self, query_date: datetime, sample_period: int):
+    end_time = query_date.timestamp()
+    start_time = end_time - sample_period + 1
+
+    for data in self.request_all_queries(start_time, end_time):
+      timestamp, query_type, domain, client, status, destination, reply_type, reply_time, dnssec = data
+      p = Point("logs") \
+        .time(datetime.fromtimestamp(timestamp)) \
+        .tag("hostname", self.host) \
+        .tag("query_type", query_type) \
+        .field("domain", domain) \
+        .tag("client", client) \
+        .tag("status", QueryStati(status)) \
+        .tag("dnssec", dnssec != 0) \
+        .field("reply_time", reply_time)
+      if destination:
+        p.tag("destination", destination)
+      yield p
+
+if __name__ == "__main__":
+  import argparse
+  parser = argparse.ArgumentParser(description='Export Pi-Hole statistics')
+  parser.add_argument('--host', required=True, type=str, help='Pi-Hole host')
+  parser.add_argument('--token', '-t', required=True, type=str, help='Pi-Hole API token')
+  args = parser.parse_args()
+  pihole = PiHole(host=args.host, token=args.token)
+
+  points = list(pihole.get_queries_for_influxdb(datetime.now(), 600))
+  for p in points:
+    print(p._time, p._name, p._tags, p._fields)

+ 2 - 1
requirements.txt

@@ -1,2 +1,3 @@
 influxdb-client
-PiHole-api
+pandas
+requests