123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- import requests
- from datetime import datetime
- from enum import Enum
- from influxdb_client import Point
- class PiHole:
- def __init__(self, host, password):
- self.host = host
- if host.startswith("http"):
- self.url = host
- else:
- self.url = f"http://{host}"
- if password:
- json = self.post("auth", {'password': password}).json()
- if not 'session' in json or not json['session']['valid']:
- print(f"auth response: {json}")
- self.sid = json['session']['sid']
- self.csrf = json['session'].get('csrf', None)
- def post(self, endpoint, params={}):
- return requests.post(f"{self.url}/api/{endpoint}", json=params)
- def query(self, endpoint, params={}):
- return requests.get(f"{self.url}/api/{endpoint}", params=params)
-
- def request_all_queries(self, start: float, end: float):
- if not self.sid:
- raise Exception("Password required")
- params = {
- "from": int(start),
- "until": int(end),
- "length": 100000,
- "sid": self.sid
- }
- json = self.query("queries", params=params).json()
- if not 'queries' in json:
- print(f"API response: {json}")
- return json['queries']
- def request_summary(self):
- if not self.sid:
- raise Exception("Password required")
- params = {
- "sid": self.sid
- }
- json = self.query("stats/summary", params=params).json()
- return json
-
- def request_forward_destinations(self):
- if not self.sid:
- raise Exception("Password required")
- params = {
- "sid": self.sid
- }
- json = self.query("stats/upstreams", params=params).json()
- if not 'upstreams' in json:
- print(f"API response: {json}")
- return json['upstreams']
- def get_totals_for_influxdb(self):
- summary = self.request_summary()
- timestamp = datetime.now().astimezone()
- yield Point("domains") \
- .time(timestamp) \
- .tag("hostname", self.host) \
- .field("domain_count", summary['gravity']['domains_being_blocked']) \
- .field("unique_domains", summary['queries']['unique_domains']) \
- .field("forwarded", summary['queries']['forwarded']) \
- .field("cached", summary['queries']['cached'])
-
- yield Point("queries") \
- .time(timestamp) \
- .tag("hostname", self.host) \
- .field("queries", summary['queries']['total']) \
- .field("blocked", summary['queries']['blocked']) \
- .field("ads_percentage", summary['queries']['percent_blocked'])
-
- yield Point("clients") \
- .time(timestamp) \
- .tag("hostname", self.host) \
- .field("total_clients", summary['clients']['total']) \
- .field("unique_clients", summary['clients']['active']) \
- .field("total_queries", sum(summary['queries']['types'].values()))
-
- yield Point("other") \
- .time(timestamp) \
- .tag("hostname", self.host) \
- .field("gravity_last_update", summary['gravity']['last_update'])
- for key, value in summary['queries']['types'].items():
- yield Point("query_types") \
- .time(timestamp) \
- .tag("hostname", self.host) \
- .tag("query_type", key) \
- .field("value", float(value))
- forward_destinations = self.request_forward_destinations()
- for upstream in forward_destinations:
- yield Point("forward_destinations") \
- .time(timestamp) \
- .tag("hostname", self.host) \
- .tag("ip", upstream['ip']) \
- .tag("destination", upstream['name'] or upstream['ip']) \
- .field("value", float(upstream['count']))
-
- def get_queries_for_influxdb(self, query_date: datetime, sample_period: int):
- # Get all queries since last sample
- end_time = query_date.timestamp()
- start_time = end_time - sample_period + 1
- queries = self.request_all_queries(start_time, end_time)
- timestamp = datetime.now().astimezone()
- # we still need some stats from the summary
- summary = self.request_summary()
- yield Point("domains") \
- .time(timestamp) \
- .tag("hostname", self.host) \
- .field("domain_count", summary['gravity']['domains_being_blocked']) \
- .field("unique_domains", len(set(x['domain'] for x in queries))) \
- .field("forwarded", sum(1 for x in queries if x['status'].startswith("FORWARDED"))) \
- .field("cached", sum(1 for x in queries if x['status'].startswith("CACHED")))
-
- blocked_count = sum(1 for x in queries if x['status'].startswith("BLOCKED") or x['status'].startswith("BLACKLIST"))
- queries_point = Point("queries") \
- .time(timestamp) \
- .tag("hostname", self.host) \
- .field("queries", len(queries)) \
- .field("blocked", blocked_count) \
- .field("ads_percentage", blocked_count * 100.0 / max(1, len(queries)))
- yield queries_point
- clients = {}
- for query in queries:
- name = query['client']['name'] or query['client']['ip']
- group = clients.get(name, [])
- group.append(query)
- clients[name] = group
- for name, group in clients.items():
- blocked_count = sum(1 for x in group if x['status'].startswith("BLOCKED") or x['status'].startswith("BLACKLIST"))
- clients_point = Point("clients") \
- .time(timestamp) \
- .tag("hostname", self.host) \
- .tag("client", name) \
- .field("queries", len(group)) \
- .field("blocked", blocked_count) \
- .field("ads_percentage", blocked_count * 100.0 / max(1, len(group)))
- yield clients_point
- yield Point("other") \
- .time(timestamp) \
- .tag("hostname", self.host) \
- .field("gravity_last_update", summary['gravity']['last_update'])
- for key in summary['queries']['types']:
- yield Point("query_types") \
- .time(timestamp) \
- .tag("hostname", self.host) \
- .tag("query_type", key) \
- .field("queries", sum(1 for x in queries if x['type'] == key))
- destinations = {}
- for query in queries:
- if query['upstream']:
- name = query['upstream'].split('#')[0]
- group = clients.get(name, [])
- group.append(query)
- clients[name] = group
- for name, group in destinations.items():
- yield Point("forward_destinations") \
- .time(timestamp) \
- .tag("hostname", self.host) \
- .tag("destination", name) \
- .field("queries", len(group))
- def get_query_logs_for_influxdb(self, query_date: datetime, sample_period: int):
- end_time = query_date.timestamp()
- start_time = end_time - sample_period + 1
- for query in self.request_all_queries(start_time, end_time):
- p = Point("logs") \
- .time(datetime.fromtimestamp(query['time'])) \
- .tag("hostname", self.host) \
- .tag("query_type", query['type']) \
- .field("domain", query['domain']) \
- .tag("client", query['client']['name'] or query['client']['ip']) \
- .tag("status", query['status'][0] + query['status'][1:].lower()) \
- .tag("reply_type", query['reply']['type']) \
- .field("reply_time", query['reply']['time']) \
- .tag("dnssec", query['dnssec'][0] + query['dnssec'][1:].lower())
- if query['upstream']:
- p.tag("destination", query['upstream'].split('#')[0])
- yield p
- if __name__ == "__main__":
- import argparse
- parser = argparse.ArgumentParser(description='Export Pi-Hole statistics')
- parser.add_argument('--host', required=True, type=str, help='Pi-Hole host')
- parser.add_argument('--password', '-t', required=True, type=str, help='Pi-Hole API password')
- args = parser.parse_args()
- pihole = PiHole(host=args.host, password=args.password)
- points = list(pihole.get_queries_for_influxdb(datetime.now(), 600))
- for p in points:
- print(p._time, p._name, p._tags, p._fields)
|