import re import requests import math import json import time import os import yaml from lxml import html from datetime import datetime from rich.console import Console from rich.table import Table with open('conf.yml', 'r') as file: config = yaml.safe_load(file) ADMIN_USERNAME = config['ADMIN_USERNAME'] ADMIN_PASSWORD = config['ADMIN_PASSWORD'] REFRESH_TIME = config['REFRESH_TIME'] session = requests.Session() console = Console() base_url = 'http://10.0.0.1' login_page_scope = '/check.php' devices_page_scope = '/connected_devices_computers.php' payload = { 'username': ADMIN_USERNAME, 'password': ADMIN_PASSWORD } # log in def login(): response = session.post(base_url + login_page_scope, data=payload) if response.ok: print("Login successful") else: print("Login failed") return False return True # fetch devices information def fetch_devices_data(): devices_url = base_url + devices_page_scope devices_response = session.get(devices_url) tree = html.fromstring(devices_response.content) def device_info(device_info): info_dict = {} for info in device_info: k = info.xpath('./b//text()')[0].strip() if info.xpath('./b//text()') else None v = info.xpath('.//text()')[1].strip() if len(info.xpath('.//text()')) > 1 else None if k and v: info_dict[k.lower().replace(" ", "_")] = v.strip() return info_dict devices_data = { "online_devices": [], "offline_devices": [] } # process online devices online_devices = tree.xpath('//div[@id="online-private"]/table//tr')[1:-1] for row in online_devices: host_name = row.xpath('.//td[@headers="host-name"]/a//text()')[0].strip() dhcp_or_reserved = row.xpath('.//td[@headers="dhcp-or-reserved"]//text()')[0].strip() rssi_text = row.xpath('.//td[@headers="rssi-level"]//text()')[0].strip() try: rssi = int(float(rssi_text.replace(" dBm", ""))) except ValueError: print(f"Invalid RSSI value for {host_name}: {rssi_text}") rssi = None connection_type = row.xpath('.//td[@headers="connection-type"]//text()')[0].strip() frequency = int(float(re.findall(r'\d+\.?\d*', connection_type)[0]) * 1000) if connection_type else None distance = round(10 ** ((30 - rssi - (20 * math.log10(frequency)) - 32.44) / 20), 2) if rssi is not None and frequency is not None else None device_entry = { "name": host_name, "online": True, "ip_type": dhcp_or_reserved, "rssi": rssi, "network": connection_type, "frequency": frequency, "distance": distance, "device_info": device_info(row.xpath('.//td[@headers="host-name"]//div[@class="device-info"]/dl/dd')) } devices_data["online_devices"].append(device_entry) # process offline devices offline_devices = tree.xpath('//div[@id="offline-private"]/table//tr')[1:-1] for row in offline_devices: host_name = row.xpath('.//td[@headers="offline-device-host-name"]/a//text()')[0].strip() device_entry = { "name": host_name, "online": False, "ip_type": None, "rssi": None, "network": None, "frequency": None, "distance": None, "device_info": device_info(row.xpath('.//td[@headers="offline-device-host-name"]//div[@class="device-info"]/dl/dd')) } devices_data["offline_devices"].append(device_entry) # sort online devices by distance known_distances = [device for device in devices_data["online_devices"] if device["distance"] is not None] unknown_distances = [device for device in devices_data["online_devices"] if device["distance"] is None] sorted_known_distances = sorted(known_distances, key=lambda x: x["distance"]) devices_data["online_devices"] = sorted_known_distances + unknown_distances return devices_data # format keys def format_keys(data): return {k.lower().replace(" ", "_"): v for k, v in data.items()} # log data def log_devices_data(devices_data): timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") combined_devices = devices_data["online_devices"] + devices_data["offline_devices"] # format keys for each device entry formatted_devices = [format_keys(device) for device in combined_devices] log_entry = {"timestamp": timestamp, "data": formatted_devices} with open("device_data.log", "a") as log_file: log_file.write(json.dumps(log_entry) + "\n") # log status changes def log_device_status_change(device_name, online): timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") log_entry = {"timestamp": timestamp, "name": device_name, "online": online} with open("device_list.log", "a") as log_file: log_file.write(json.dumps(format_keys(log_entry)) + "\n") # display devices def display_devices(devices_data, current_device_status): os.system('cls' if os.name == 'nt' else 'clear') header = f"{'Name':<20}{'Online':<10}{'IP Type':<15}{'RSSI':<10}{'Network':<20}{'Frequency (MHz)':<20}{'Distance (m)':<15}{'Last Activity':<20}" print(header) def format_device_row(device, last_activity): return f"{device['name']:<20}{str(device['online']):<10}{(device['ip_type'] if device['ip_type'] is not None else 'null'):<15}{(str(device['rssi']) if device['rssi'] is not None else 'null'):<10}{(device['network'] if device['network'] is not None else 'null'):<20}{(str(device['frequency']) if device['frequency'] is not None else 'null'):<20}{(str(device['distance']) if device['distance'] is not None else 'null'):<15}{(last_activity if last_activity is not None else 'null'):<20}" # add online devices for device in devices_data["online_devices"]: last_activity = current_device_status.get(device["name"], {}).get("last_activity", None) print(format_device_row(device, last_activity)) # add offline devices for device in devices_data["offline_devices"]: last_activity = current_device_status.get(device["name"], {}).get("last_activity", None) print(format_device_row(device, last_activity)) # main if login(): current_device_status = {} try: while True: devices_data = fetch_devices_data() if devices_data: log_devices_data(devices_data) new_device_status = {} for d in devices_data["online_devices"] + devices_data["offline_devices"]: device_name = d["name"] online = d["online"] if device_name in current_device_status: new_device_status[device_name] = current_device_status[device_name] if current_device_status[device_name]["online"] != online: new_device_status[device_name]["last_activity"] = datetime.now().strftime("%y-%m-%d %H:%M:%S") # log only on status change if online != current_device_status[device_name]["online"]: log_device_status_change(device_name, online) new_device_status[device_name]["online"] = online else: new_device_status[device_name] = { "online": online, "last_activity": datetime.now().strftime("%y-%m-%d %H:%M:%S") } log_device_status_change(device_name, online) current_device_status = new_device_status display_devices(devices_data, current_device_status) # refesh time.sleep(REFRESH_TIME) except KeyboardInterrupt: print("Stopped by user.") finally: session.close()