200 lines
7.2 KiB
Python
200 lines
7.2 KiB
Python
import re
|
|
import requests
|
|
import math
|
|
import json
|
|
import time
|
|
import os
|
|
import yaml
|
|
from lxml import html
|
|
from datetime import datetime
|
|
from rich.console import Console
|
|
from rich.table import Table
|
|
|
|
with open('conf.yml', 'r') as file:
|
|
config = yaml.safe_load(file)
|
|
|
|
ADMIN_USERNAME = config['ADMIN_USERNAME']
|
|
ADMIN_PASSWORD = config['ADMIN_PASSWORD']
|
|
REFRESH_TIME = config['REFRESH_TIME']
|
|
|
|
session = requests.Session()
|
|
console = Console()
|
|
|
|
base_url = 'http://10.0.0.1'
|
|
login_page_scope = '/check.php'
|
|
devices_page_scope = '/connected_devices_computers.php'
|
|
|
|
payload = {
|
|
'username': ADMIN_USERNAME,
|
|
'password': ADMIN_PASSWORD
|
|
}
|
|
|
|
# log in
|
|
def login():
|
|
response = session.post(base_url + login_page_scope, data=payload)
|
|
|
|
if response.ok:
|
|
print("Login successful")
|
|
else:
|
|
print("Login failed")
|
|
return False
|
|
return True
|
|
|
|
# fetch devices information
|
|
def fetch_devices_data():
|
|
devices_url = base_url + devices_page_scope
|
|
devices_response = session.get(devices_url)
|
|
tree = html.fromstring(devices_response.content)
|
|
|
|
def device_info(device_info):
|
|
info_dict = {}
|
|
for info in device_info:
|
|
k = info.xpath('./b//text()')[0].strip() if info.xpath('./b//text()') else None
|
|
v = info.xpath('.//text()')[1].strip() if len(info.xpath('.//text()')) > 1 else None
|
|
if k and v:
|
|
info_dict[k.lower().replace(" ", "_")] = v.strip()
|
|
return info_dict
|
|
|
|
devices_data = {
|
|
"online_devices": [],
|
|
"offline_devices": []
|
|
}
|
|
|
|
# process online devices
|
|
online_devices = tree.xpath('//div[@id="online-private"]/table//tr')[1:-1]
|
|
for row in online_devices:
|
|
host_name = row.xpath('.//td[@headers="host-name"]/a//text()')[0].strip()
|
|
dhcp_or_reserved = row.xpath('.//td[@headers="dhcp-or-reserved"]//text()')[0].strip()
|
|
rssi_text = row.xpath('.//td[@headers="rssi-level"]//text()')[0].strip()
|
|
|
|
try:
|
|
rssi = int(float(rssi_text.replace(" dBm", "")))
|
|
except ValueError:
|
|
print(f"Invalid RSSI value for {host_name}: {rssi_text}")
|
|
rssi = None
|
|
|
|
connection_type = row.xpath('.//td[@headers="connection-type"]//text()')[0].strip()
|
|
frequency = int(float(re.findall(r'\d+\.?\d*', connection_type)[0]) * 1000) if connection_type else None
|
|
distance = round(10 ** ((30 - rssi - (20 * math.log10(frequency)) - 32.44) / 20), 2) if rssi is not None and frequency is not None else None
|
|
|
|
device_entry = {
|
|
"name": host_name,
|
|
"online": True,
|
|
"ip_type": dhcp_or_reserved,
|
|
"rssi": rssi,
|
|
"network": connection_type,
|
|
"frequency": frequency,
|
|
"distance": distance,
|
|
"device_info": device_info(row.xpath('.//td[@headers="host-name"]//div[@class="device-info"]/dl/dd'))
|
|
}
|
|
|
|
devices_data["online_devices"].append(device_entry)
|
|
|
|
# process offline devices
|
|
offline_devices = tree.xpath('//div[@id="offline-private"]/table//tr')[1:-1]
|
|
for row in offline_devices:
|
|
host_name = row.xpath('.//td[@headers="offline-device-host-name"]/a//text()')[0].strip()
|
|
|
|
device_entry = {
|
|
"name": host_name,
|
|
"online": False,
|
|
"ip_type": None,
|
|
"rssi": None,
|
|
"network": None,
|
|
"frequency": None,
|
|
"distance": None,
|
|
"device_info": device_info(row.xpath('.//td[@headers="offline-device-host-name"]//div[@class="device-info"]/dl/dd'))
|
|
}
|
|
|
|
devices_data["offline_devices"].append(device_entry)
|
|
|
|
# sort online devices by distance
|
|
known_distances = [device for device in devices_data["online_devices"] if device["distance"] is not None]
|
|
unknown_distances = [device for device in devices_data["online_devices"] if device["distance"] is None]
|
|
|
|
sorted_known_distances = sorted(known_distances, key=lambda x: x["distance"])
|
|
devices_data["online_devices"] = sorted_known_distances + unknown_distances
|
|
|
|
return devices_data
|
|
|
|
# format keys
|
|
def format_keys(data):
|
|
return {k.lower().replace(" ", "_"): v for k, v in data.items()}
|
|
|
|
# log data
|
|
def log_devices_data(devices_data):
|
|
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
combined_devices = devices_data["online_devices"] + devices_data["offline_devices"]
|
|
|
|
# format keys for each device entry
|
|
formatted_devices = [format_keys(device) for device in combined_devices]
|
|
log_entry = {"timestamp": timestamp, "data": formatted_devices}
|
|
|
|
with open("device_data.log", "a") as log_file:
|
|
log_file.write(json.dumps(log_entry) + "\n")
|
|
|
|
# log status changes
|
|
def log_device_status_change(device_name, online):
|
|
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
log_entry = {"timestamp": timestamp, "name": device_name, "online": online}
|
|
|
|
with open("device_list.log", "a") as log_file:
|
|
log_file.write(json.dumps(format_keys(log_entry)) + "\n")
|
|
|
|
# display devices
|
|
def display_devices(devices_data, current_device_status):
|
|
os.system('cls' if os.name == 'nt' else 'clear')
|
|
|
|
header = f"{'Name':<20}{'Online':<10}{'IP Type':<15}{'RSSI':<10}{'Network':<20}{'Frequency (MHz)':<20}{'Distance (m)':<15}{'Last Activity':<20}"
|
|
print(header)
|
|
|
|
def format_device_row(device, last_activity):
|
|
return f"{device['name']:<20}{str(device['online']):<10}{(device['ip_type'] if device['ip_type'] is not None else 'null'):<15}{(str(device['rssi']) if device['rssi'] is not None else 'null'):<10}{(device['network'] if device['network'] is not None else 'null'):<20}{(str(device['frequency']) if device['frequency'] is not None else 'null'):<20}{(str(device['distance']) if device['distance'] is not None else 'null'):<15}{(last_activity if last_activity is not None else 'null'):<20}"
|
|
|
|
# add online devices
|
|
for device in devices_data["online_devices"]:
|
|
last_activity = current_device_status.get(device["name"], {}).get("last_activity", None)
|
|
print(format_device_row(device, last_activity))
|
|
|
|
# add offline devices
|
|
for device in devices_data["offline_devices"]:
|
|
last_activity = current_device_status.get(device["name"], {}).get("last_activity", None)
|
|
print(format_device_row(device, last_activity))
|
|
|
|
# main
|
|
if login():
|
|
current_device_status = {}
|
|
|
|
try:
|
|
while True:
|
|
devices_data = fetch_devices_data()
|
|
if devices_data:
|
|
log_devices_data(devices_data)
|
|
|
|
new_device_status = {}
|
|
for d in devices_data["online_devices"] + devices_data["offline_devices"]:
|
|
device_name = d["name"]
|
|
online = d["online"]
|
|
if device_name in current_device_status:
|
|
new_device_status[device_name] = current_device_status[device_name]
|
|
if current_device_status[device_name]["online"] != online:
|
|
new_device_status[device_name]["last_activity"] = datetime.now().strftime("%y-%m-%d %H:%M:%S")
|
|
# log only on status change
|
|
if online != current_device_status[device_name]["online"]:
|
|
log_device_status_change(device_name, online)
|
|
new_device_status[device_name]["online"] = online
|
|
else:
|
|
new_device_status[device_name] = {
|
|
"online": online,
|
|
"last_activity": datetime.now().strftime("%y-%m-%d %H:%M:%S")
|
|
}
|
|
log_device_status_change(device_name, online)
|
|
|
|
current_device_status = new_device_status
|
|
display_devices(devices_data, current_device_status)
|
|
# refesh
|
|
time.sleep(REFRESH_TIME)
|
|
except KeyboardInterrupt:
|
|
print("Stopped by user.")
|
|
finally:
|
|
session.close()
|