wifeye/wifeye.py
2024-11-17 14:33:33 -08:00

200 lines
7.2 KiB
Python

import re
import requests
import math
import json
import time
import os
import yaml
from lxml import html
from datetime import datetime
from rich.console import Console
from rich.table import Table
with open('conf.yml', 'r') as file:
config = yaml.safe_load(file)
ADMIN_USERNAME = config['ADMIN_USERNAME']
ADMIN_PASSWORD = config['ADMIN_PASSWORD']
REFRESH_TIME = config['REFRESH_TIME']
session = requests.Session()
console = Console()
base_url = 'http://10.0.0.1'
login_page_scope = '/check.php'
devices_page_scope = '/connected_devices_computers.php'
payload = {
'username': ADMIN_USERNAME,
'password': ADMIN_PASSWORD
}
# log in
def login():
response = session.post(base_url + login_page_scope, data=payload)
if response.ok:
print("Login successful")
else:
print("Login failed")
return False
return True
# fetch devices information
def fetch_devices_data():
devices_url = base_url + devices_page_scope
devices_response = session.get(devices_url)
tree = html.fromstring(devices_response.content)
def device_info(device_info):
info_dict = {}
for info in device_info:
k = info.xpath('./b//text()')[0].strip() if info.xpath('./b//text()') else None
v = info.xpath('.//text()')[1].strip() if len(info.xpath('.//text()')) > 1 else None
if k and v:
info_dict[k.lower().replace(" ", "_")] = v.strip()
return info_dict
devices_data = {
"online_devices": [],
"offline_devices": []
}
# process online devices
online_devices = tree.xpath('//div[@id="online-private"]/table//tr')[1:-1]
for row in online_devices:
host_name = row.xpath('.//td[@headers="host-name"]/a//text()')[0].strip()
dhcp_or_reserved = row.xpath('.//td[@headers="dhcp-or-reserved"]//text()')[0].strip()
rssi_text = row.xpath('.//td[@headers="rssi-level"]//text()')[0].strip()
try:
rssi = int(float(rssi_text.replace(" dBm", "")))
except ValueError:
print(f"Invalid RSSI value for {host_name}: {rssi_text}")
rssi = None
connection_type = row.xpath('.//td[@headers="connection-type"]//text()')[0].strip()
frequency = int(float(re.findall(r'\d+\.?\d*', connection_type)[0]) * 1000) if connection_type else None
distance = round(10 ** ((30 - rssi - (20 * math.log10(frequency)) - 32.44) / 20), 2) if rssi is not None and frequency is not None else None
device_entry = {
"name": host_name,
"online": True,
"ip_type": dhcp_or_reserved,
"rssi": rssi,
"network": connection_type,
"frequency": frequency,
"distance": distance,
"device_info": device_info(row.xpath('.//td[@headers="host-name"]//div[@class="device-info"]/dl/dd'))
}
devices_data["online_devices"].append(device_entry)
# process offline devices
offline_devices = tree.xpath('//div[@id="offline-private"]/table//tr')[1:-1]
for row in offline_devices:
host_name = row.xpath('.//td[@headers="offline-device-host-name"]/a//text()')[0].strip()
device_entry = {
"name": host_name,
"online": False,
"ip_type": None,
"rssi": None,
"network": None,
"frequency": None,
"distance": None,
"device_info": device_info(row.xpath('.//td[@headers="offline-device-host-name"]//div[@class="device-info"]/dl/dd'))
}
devices_data["offline_devices"].append(device_entry)
# sort online devices by distance
known_distances = [device for device in devices_data["online_devices"] if device["distance"] is not None]
unknown_distances = [device for device in devices_data["online_devices"] if device["distance"] is None]
sorted_known_distances = sorted(known_distances, key=lambda x: x["distance"])
devices_data["online_devices"] = sorted_known_distances + unknown_distances
return devices_data
# format keys
def format_keys(data):
return {k.lower().replace(" ", "_"): v for k, v in data.items()}
# log data
def log_devices_data(devices_data):
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
combined_devices = devices_data["online_devices"] + devices_data["offline_devices"]
# format keys for each device entry
formatted_devices = [format_keys(device) for device in combined_devices]
log_entry = {"timestamp": timestamp, "data": formatted_devices}
with open("device_data.log", "a") as log_file:
log_file.write(json.dumps(log_entry) + "\n")
# log status changes
def log_device_status_change(device_name, online):
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
log_entry = {"timestamp": timestamp, "name": device_name, "online": online}
with open("device_list.log", "a") as log_file:
log_file.write(json.dumps(format_keys(log_entry)) + "\n")
# display devices
def display_devices(devices_data, current_device_status):
os.system('cls' if os.name == 'nt' else 'clear')
header = f"{'Name':<20}{'Online':<10}{'IP Type':<15}{'RSSI':<10}{'Network':<20}{'Frequency (MHz)':<20}{'Distance (m)':<15}{'Last Activity':<20}"
print(header)
def format_device_row(device, last_activity):
return f"{device['name']:<20}{str(device['online']):<10}{(device['ip_type'] if device['ip_type'] is not None else 'null'):<15}{(str(device['rssi']) if device['rssi'] is not None else 'null'):<10}{(device['network'] if device['network'] is not None else 'null'):<20}{(str(device['frequency']) if device['frequency'] is not None else 'null'):<20}{(str(device['distance']) if device['distance'] is not None else 'null'):<15}{(last_activity if last_activity is not None else 'null'):<20}"
# add online devices
for device in devices_data["online_devices"]:
last_activity = current_device_status.get(device["name"], {}).get("last_activity", None)
print(format_device_row(device, last_activity))
# add offline devices
for device in devices_data["offline_devices"]:
last_activity = current_device_status.get(device["name"], {}).get("last_activity", None)
print(format_device_row(device, last_activity))
# main
if login():
current_device_status = {}
try:
while True:
devices_data = fetch_devices_data()
if devices_data:
log_devices_data(devices_data)
new_device_status = {}
for d in devices_data["online_devices"] + devices_data["offline_devices"]:
device_name = d["name"]
online = d["online"]
if device_name in current_device_status:
new_device_status[device_name] = current_device_status[device_name]
if current_device_status[device_name]["online"] != online:
new_device_status[device_name]["last_activity"] = datetime.now().strftime("%y-%m-%d %H:%M:%S")
# log only on status change
if online != current_device_status[device_name]["online"]:
log_device_status_change(device_name, online)
new_device_status[device_name]["online"] = online
else:
new_device_status[device_name] = {
"online": online,
"last_activity": datetime.now().strftime("%y-%m-%d %H:%M:%S")
}
log_device_status_change(device_name, online)
current_device_status = new_device_status
display_devices(devices_data, current_device_status)
# refesh
time.sleep(REFRESH_TIME)
except KeyboardInterrupt:
print("Stopped by user.")
finally:
session.close()