229 lines
6.6 KiB
Python
229 lines
6.6 KiB
Python
#!/usr/bin/env python3
|
|
import sys
|
|
import os
|
|
import random
|
|
import re
|
|
import pandas as pd
|
|
|
|
# --- Redaction functions ---
|
|
def redact_name(name: str) -> str:
|
|
if not isinstance(name, str) or name == '':
|
|
return name
|
|
|
|
s = name
|
|
out = list(s)
|
|
first_done = False
|
|
|
|
for i, ch in enumerate(s):
|
|
if ch.isspace():
|
|
continue
|
|
if not first_done:
|
|
first_done = True
|
|
continue
|
|
out[i] = '*'
|
|
|
|
return ''.join(out)
|
|
|
|
def redact_job_title(val: str) -> str:
|
|
if not isinstance(val, str) or val == '':
|
|
return val
|
|
s = val
|
|
out = list(s)
|
|
first_done = False
|
|
for i, ch in enumerate(s):
|
|
if ch.isspace():
|
|
continue
|
|
if not first_done:
|
|
first_done = True
|
|
continue
|
|
out[i] = '*'
|
|
return ''.join(out)
|
|
|
|
def redact_office_number(val: str) -> str:
|
|
if not isinstance(val, str) or val == '':
|
|
return val
|
|
s = str(val)
|
|
out = list(s)
|
|
first_digit_done = False
|
|
for i, ch in enumerate(s):
|
|
if ch.isdigit():
|
|
if not first_digit_done:
|
|
first_digit_done = True
|
|
continue
|
|
out[i] = '*'
|
|
return ''.join(out)
|
|
|
|
def redact_local_domain(addr: str) -> str:
|
|
if not isinstance(addr, str) or addr == '':
|
|
return addr
|
|
addr = addr.strip()
|
|
if '@' not in addr:
|
|
local = addr
|
|
domain = ''
|
|
else:
|
|
local, domain = addr.split('@', 1)
|
|
if not local:
|
|
red_local = local
|
|
elif len(local) == 1:
|
|
red_local = '*'
|
|
elif len(local) == 2:
|
|
red_local = local
|
|
else:
|
|
out_chars = list(local)
|
|
for i in range(1, len(local) - 1):
|
|
out_chars[i] = '*'
|
|
red_local = ''.join(out_chars)
|
|
return f"{red_local}@{domain}" if domain != '' else red_local
|
|
|
|
def redact_phone(num: str) -> str:
|
|
if not isinstance(num, str) or num == '':
|
|
return num
|
|
s = str(num)
|
|
digits = [c for c in s if c.isdigit()]
|
|
if not digits:
|
|
return s
|
|
last_digit = digits[-1]
|
|
out_chars = []
|
|
digit_idx = 0
|
|
for c in s:
|
|
if c.isdigit():
|
|
out_chars.append(last_digit if digit_idx == len(digits) - 1 else '*')
|
|
digit_idx += 1
|
|
else:
|
|
out_chars.append(c)
|
|
return ''.join(out_chars)
|
|
|
|
def redact_profile_pic(pic: str) -> str:
|
|
if not isinstance(pic, str) or pic == '':
|
|
return pic
|
|
keep = 20
|
|
return pic if len(pic) <= keep else pic[:20] + ('*' * (len(pic) - 20))
|
|
|
|
def redact_dept(val: str) -> str:
|
|
if not isinstance(val, str) or val == '':
|
|
return val
|
|
s = val
|
|
n = len(s)
|
|
i = 0
|
|
while i < n and s[i].isspace():
|
|
i += 1
|
|
while i < n and not s[i].isspace():
|
|
i += 1
|
|
out = list(s)
|
|
for j in range(i, n):
|
|
if out[j].isspace():
|
|
continue
|
|
out[j] = '*'
|
|
return ''.join(out)
|
|
|
|
def redact_office_location(val: str) -> str:
|
|
if not isinstance(val, str) or val == '':
|
|
return val
|
|
s = val
|
|
choice = random.choice([1, 2])
|
|
if choice == 1:
|
|
return ''.join('*' if ch.isdigit() else ch for ch in s)
|
|
else:
|
|
return ''.join('*' if ch.isalpha() else ch for ch in s)
|
|
|
|
# --- Exposure detection & masking ---
|
|
EMAIL_RE = re.compile(r'([A-Za-z0-9._%+\-]+)@([A-Za-z0-9.\-]+\.[A-Za-z]{2,})')
|
|
PHONE_CAND_RE = re.compile(r'[\+\(]?\d{1,4}[\)\-\s\.\/]?(?:\d[\-\s\.\/\(\)]?){2,}\d')
|
|
|
|
def mask_phone_in_text(s: str, min_visible_digits: int = 4):
|
|
if not isinstance(s, str) or s == '':
|
|
return s, []
|
|
new = list(s)
|
|
exposures = []
|
|
for m in PHONE_CAND_RE.finditer(s):
|
|
seq = m.group(0)
|
|
visible_digits = sum(1 for ch in seq if ch.isdigit())
|
|
if visible_digits >= min_visible_digits:
|
|
for i in range(m.start(), m.end()):
|
|
new[i] = '*'
|
|
exposures.append((m.start(), m.end(), seq))
|
|
return ''.join(new), exposures
|
|
|
|
def mask_email_local_in_text(s: str):
|
|
if not isinstance(s, str) or s == '':
|
|
return s, []
|
|
new = list(s)
|
|
exposures = []
|
|
for m in EMAIL_RE.finditer(s):
|
|
local_start, local_end = m.start(1), m.end(1)
|
|
local = s[local_start:local_end]
|
|
if '*' not in local:
|
|
for i in range(local_start, local_end):
|
|
new[i] = '*'
|
|
exposures.append((local_start, local_end, local))
|
|
return ''.join(new), exposures
|
|
|
|
def scan_and_mask_exposures(df: pd.DataFrame, min_visible_digits: int = 4):
|
|
df = df.copy()
|
|
for idx, row in df.iterrows():
|
|
for col in df.columns:
|
|
val = row[col]
|
|
if not isinstance(val, str) or val == '':
|
|
continue
|
|
modified = val
|
|
modified, _ = mask_phone_in_text(modified, min_visible_digits)
|
|
modified, _ = mask_email_local_in_text(modified)
|
|
if modified != val:
|
|
df.at[idx, col] = modified
|
|
return df
|
|
|
|
# --- DataFrame processing pipeline ---
|
|
def process_df(df: pd.DataFrame) -> pd.DataFrame:
|
|
df = df.copy()
|
|
|
|
if 'Name' in df:
|
|
df['Name'] = df['Name'].map(redact_name)
|
|
if 'Email Address' in df:
|
|
df['Email Address'] = df['Email Address'].map(redact_local_domain)
|
|
if 'Chat Address' in df:
|
|
df['Chat Address'] = df['Chat Address'].map(redact_local_domain)
|
|
if 'Mobile' in df:
|
|
df['Mobile'] = df['Mobile'].map(redact_phone)
|
|
if 'Work Phone' in df:
|
|
df['Work Phone'] = df['Work Phone'].map(redact_phone)
|
|
if 'Profile Picture' in df:
|
|
df['Profile Picture'] = df['Profile Picture'].map(redact_profile_pic)
|
|
if 'Department' in df:
|
|
df['Department'] = df['Department'].map(redact_dept)
|
|
if 'Office Location' in df:
|
|
df['Office Location'] = df['Office Location'].map(redact_office_location)
|
|
|
|
# Column F (index 5) → Job Title
|
|
if len(df.columns) > 5:
|
|
col_f = df.columns[5]
|
|
df[col_f] = df[col_f].map(redact_job_title)
|
|
|
|
# Column H (index 7) → Office Number
|
|
if len(df.columns) > 7:
|
|
col_h = df.columns[7]
|
|
df[col_h] = df[col_h].map(redact_office_number)
|
|
|
|
return df
|
|
|
|
def main():
|
|
if len(sys.argv) < 2:
|
|
print("Usage: python script.py inputfilename.csv [outputfilename.csv]")
|
|
sys.exit(1)
|
|
|
|
inp = sys.argv[1]
|
|
outp = sys.argv[2] if len(sys.argv) > 2 else None
|
|
|
|
if not outp:
|
|
base, ext = os.path.splitext(inp)
|
|
outp = f"{base}_redacted{ext or '.csv'}"
|
|
|
|
df = pd.read_csv(inp, dtype=str, keep_default_na=False)
|
|
|
|
redacted = process_df(df)
|
|
redacted = scan_and_mask_exposures(redacted, min_visible_digits=4)
|
|
|
|
redacted.to_csv(outp, index=False)
|
|
print(f"Wrote redacted file to: {outp}")
|
|
|
|
if __name__ == '__main__':
|
|
main()
|