#!/usr/bin/env python3 import sys import os import random import re import pandas as pd # --- Redaction functions --- def redact_name(name: str) -> str: if not isinstance(name, str) or name == '': return name s = name out = list(s) first_done = False for i, ch in enumerate(s): if ch.isspace(): continue if not first_done: first_done = True continue out[i] = '*' return ''.join(out) def redact_job_title(val: str) -> str: if not isinstance(val, str) or val == '': return val s = val out = list(s) first_done = False for i, ch in enumerate(s): if ch.isspace(): continue if not first_done: first_done = True continue out[i] = '*' return ''.join(out) def redact_office_number(val: str) -> str: if not isinstance(val, str) or val == '': return val s = str(val) out = list(s) first_digit_done = False for i, ch in enumerate(s): if ch.isdigit(): if not first_digit_done: first_digit_done = True continue out[i] = '*' return ''.join(out) def redact_local_domain(addr: str) -> str: if not isinstance(addr, str) or addr == '': return addr addr = addr.strip() if '@' not in addr: local = addr domain = '' else: local, domain = addr.split('@', 1) if not local: red_local = local elif len(local) == 1: red_local = '*' elif len(local) == 2: red_local = local else: out_chars = list(local) for i in range(1, len(local) - 1): out_chars[i] = '*' red_local = ''.join(out_chars) return f"{red_local}@{domain}" if domain != '' else red_local def redact_phone(num: str) -> str: if not isinstance(num, str) or num == '': return num s = str(num) digits = [c for c in s if c.isdigit()] if not digits: return s last_digit = digits[-1] out_chars = [] digit_idx = 0 for c in s: if c.isdigit(): out_chars.append(last_digit if digit_idx == len(digits) - 1 else '*') digit_idx += 1 else: out_chars.append(c) return ''.join(out_chars) def redact_profile_pic(pic: str) -> str: if not isinstance(pic, str) or pic == '': return pic keep = 20 return pic if len(pic) <= keep else pic[:20] + ('*' * (len(pic) - 20)) def redact_dept(val: str) -> str: if not isinstance(val, str) or val == '': return val s = val n = len(s) i = 0 while i < n and s[i].isspace(): i += 1 while i < n and not s[i].isspace(): i += 1 out = list(s) for j in range(i, n): if out[j].isspace(): continue out[j] = '*' return ''.join(out) def redact_office_location(val: str) -> str: if not isinstance(val, str) or val == '': return val s = val choice = random.choice([1, 2]) if choice == 1: return ''.join('*' if ch.isdigit() else ch for ch in s) else: return ''.join('*' if ch.isalpha() else ch for ch in s) # --- Exposure detection & masking --- EMAIL_RE = re.compile(r'([A-Za-z0-9._%+\-]+)@([A-Za-z0-9.\-]+\.[A-Za-z]{2,})') PHONE_CAND_RE = re.compile(r'[\+\(]?\d{1,4}[\)\-\s\.\/]?(?:\d[\-\s\.\/\(\)]?){2,}\d') def mask_phone_in_text(s: str, min_visible_digits: int = 4): if not isinstance(s, str) or s == '': return s, [] new = list(s) exposures = [] for m in PHONE_CAND_RE.finditer(s): seq = m.group(0) visible_digits = sum(1 for ch in seq if ch.isdigit()) if visible_digits >= min_visible_digits: for i in range(m.start(), m.end()): new[i] = '*' exposures.append((m.start(), m.end(), seq)) return ''.join(new), exposures def mask_email_local_in_text(s: str): if not isinstance(s, str) or s == '': return s, [] new = list(s) exposures = [] for m in EMAIL_RE.finditer(s): local_start, local_end = m.start(1), m.end(1) local = s[local_start:local_end] if '*' not in local: for i in range(local_start, local_end): new[i] = '*' exposures.append((local_start, local_end, local)) return ''.join(new), exposures def scan_and_mask_exposures(df: pd.DataFrame, min_visible_digits: int = 4): df = df.copy() for idx, row in df.iterrows(): for col in df.columns: val = row[col] if not isinstance(val, str) or val == '': continue modified = val modified, _ = mask_phone_in_text(modified, min_visible_digits) modified, _ = mask_email_local_in_text(modified) if modified != val: df.at[idx, col] = modified return df # --- DataFrame processing pipeline --- def process_df(df: pd.DataFrame) -> pd.DataFrame: df = df.copy() if 'Name' in df: df['Name'] = df['Name'].map(redact_name) if 'Email Address' in df: df['Email Address'] = df['Email Address'].map(redact_local_domain) if 'Chat Address' in df: df['Chat Address'] = df['Chat Address'].map(redact_local_domain) if 'Mobile' in df: df['Mobile'] = df['Mobile'].map(redact_phone) if 'Work Phone' in df: df['Work Phone'] = df['Work Phone'].map(redact_phone) if 'Profile Picture' in df: df['Profile Picture'] = df['Profile Picture'].map(redact_profile_pic) if 'Department' in df: df['Department'] = df['Department'].map(redact_dept) if 'Office Location' in df: df['Office Location'] = df['Office Location'].map(redact_office_location) # Column F (index 5) → Job Title if len(df.columns) > 5: col_f = df.columns[5] df[col_f] = df[col_f].map(redact_job_title) # Column H (index 7) → Office Number if len(df.columns) > 7: col_h = df.columns[7] df[col_h] = df[col_h].map(redact_office_number) return df def main(): if len(sys.argv) < 2: print("Usage: python script.py inputfilename.csv [outputfilename.csv]") sys.exit(1) inp = sys.argv[1] outp = sys.argv[2] if len(sys.argv) > 2 else None if not outp: base, ext = os.path.splitext(inp) outp = f"{base}_redacted{ext or '.csv'}" df = pd.read_csv(inp, dtype=str, keep_default_na=False) redacted = process_df(df) redacted = scan_and_mask_exposures(redacted, min_visible_digits=4) redacted.to_csv(outp, index=False) print(f"Wrote redacted file to: {outp}") if __name__ == '__main__': main()