Spaces:
Running
Running
| import gradio as gr | |
| from langchain_community.document_loaders import PyPDFLoader, UnstructuredFileLoader | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| from langchain_huggingface import HuggingFaceEmbeddings | |
| from langchain_chroma import Chroma | |
| from langchain.chains import LLMChain | |
| from langchain_groq import ChatGroq | |
| from langchain.prompts import PromptTemplate | |
| from typing import List, Dict | |
| import os | |
| import tempfile | |
| import re, json | |
| from usage_logging import record_visit, migrate_legacy_jsonl_to_event_files, rebuild_visits_rollup_from_event_files | |
| from docx import Document as DocxDocument | |
| ADMIN_KEY = os.environ.get("ADMIN_KEY", "") | |
| # Initialize embeddings | |
| embeddings = HuggingFaceEmbeddings() | |
| # Initialize separate vector stores for resumes and culture docs | |
| resume_store = Chroma( | |
| collection_name="resumes", | |
| embedding_function=embeddings, | |
| persist_directory="./chroma_db" | |
| ) | |
| culture_store = Chroma( | |
| collection_name="culture_docs", | |
| embedding_function=embeddings, | |
| persist_directory="./chroma_db" | |
| ) | |
| # Initialize LLM | |
| llm = ChatGroq( | |
| api_key=os.environ["GROQ_API_KEY"], | |
| model_name="openai/gpt-oss-120b", | |
| temperature = 0,seed = 42 | |
| ) | |
| def _parse_json_or_raise(text: str) -> dict: | |
| """ | |
| Robust-ish JSON extraction: supports raw JSON or JSON inside ``` blocks. | |
| """ | |
| text = text.strip() | |
| # pull JSON from code fences if present | |
| m = re.search(r"```(?:json)?\s*(\{.*?\})\s*```", text, flags=re.S) | |
| if m: | |
| text = m.group(1).strip() | |
| return json.loads(text) | |
| def _safe_parse_json(text: str) -> dict: | |
| try: | |
| return _parse_json_or_raise(text) | |
| except Exception: | |
| return {} | |
| def score_from_binary_matches(matched: list, missing: list) -> int: | |
| total = (len(matched) if matched else 0) + (len(missing) if missing else 0) | |
| if total == 0: | |
| return 0 | |
| return int(round((len(matched) / total) * 100)) | |
| def score_from_required_list(required: list, matched: list) -> int: | |
| required_norm = [] | |
| for r in required or []: | |
| if isinstance(r, str): | |
| s = r.strip().lower() | |
| if s: | |
| required_norm.append(s) | |
| required_set = set(required_norm) | |
| if not required_set: | |
| return 0 | |
| matched_norm = [] | |
| for m in matched or []: | |
| if isinstance(m, dict): | |
| v = (m.get("skill") or m.get("attribute") or "") | |
| else: | |
| v = str(m) | |
| s = v.strip().lower() | |
| if s: | |
| matched_norm.append(s) | |
| matched_set = set(matched_norm) | |
| hits = len(required_set.intersection(matched_set)) | |
| return int(round((hits / len(required_set)) * 100)) | |
| def score_culture_weighted(required: list, matched: list) -> int: | |
| """ | |
| direct = 1.0 | |
| inferred = 0.5 | |
| """ | |
| if not required: | |
| return 0 | |
| required_norm = set( | |
| r.strip().lower() for r in required if isinstance(r, str) and r.strip() | |
| ) | |
| if not required_norm: | |
| return 0 | |
| score_sum = 0.0 | |
| for m in matched or []: | |
| attr = (m.get("attribute") or "").strip().lower() | |
| et = (m.get("evidence_type") or "").strip().lower() | |
| if attr not in required_norm: | |
| continue | |
| if et == "direct": | |
| score_sum += 1.0 | |
| elif et == "inferred": | |
| score_sum += 0.5 | |
| pct = (score_sum / len(required_norm)) * 100 | |
| return int(round(pct)) | |
| def label_from_score(score: int) -> str: | |
| if score >= 70: | |
| return "Strong fit" | |
| if score >= 50: | |
| return "Moderate fit" | |
| return "Not a fit" | |
| _BANNED_IMPLIED_TOKENS = [ | |
| "remote", "onsite", "hybrid", "time zone", "timezone", "availability", | |
| "executive presence", "polish", "tier-1", "high-bar", "top-tier", | |
| "ivy", "faang", "big tech", "prestige", "pedigree" | |
| ] | |
| def build_implied_skill_map(job_description: str, verified_required_skills: list) -> dict: | |
| """ | |
| Uses the LLM to propose a correlation map once per JD. | |
| Output is sanitized and restricted to verified_required_skills only. | |
| """ | |
| # normalize verified skills | |
| vskills = [] | |
| for s in verified_required_skills or []: | |
| if isinstance(s, str) and s.strip(): | |
| vskills.append(s.strip().lower()) | |
| vset = set(vskills) | |
| if not vset: | |
| return {} | |
| chain = LLMChain(llm=llm, prompt=implied_map_prompt) | |
| raw = chain.run({ | |
| "job_description": job_description, | |
| "verified_required_skills": ", ".join(sorted(vset)) | |
| }) | |
| obj = _safe_parse_json(raw) or {} | |
| correlations = obj.get("correlations") or {} | |
| if not isinstance(correlations, dict): | |
| return {} | |
| out = {} | |
| for k, v in correlations.items(): | |
| if not isinstance(k, str) or not isinstance(v, list): | |
| continue | |
| skill = k.strip().lower() | |
| # Restrict keys to verified required skills only | |
| if skill not in vset: | |
| continue | |
| signals = [] | |
| for sig in v: | |
| if not isinstance(sig, str): | |
| continue | |
| s = sig.strip().lower() | |
| if not (1 <= len(s) <= 30): | |
| continue | |
| # ban risky/proxy terms | |
| if any(bt in s for bt in _BANNED_IMPLIED_TOKENS): | |
| continue | |
| signals.append(s) | |
| # de-dup, cap | |
| signals = list(dict.fromkeys(signals))[:5] | |
| # need at least 2 signals | |
| if len(signals) >= 2: | |
| out[skill] = signals | |
| return out | |
| def infer_implied_from_map( | |
| missing_skills: list, | |
| resume_text: str, | |
| implied_map: dict, | |
| min_hits: int = 2 | |
| ) -> list: | |
| """ | |
| Deterministic: suggest implied competencies if >= min_hits signals appear in resume_text. | |
| These suggestions are NOT used for scoring. | |
| """ | |
| if not implied_map or not missing_skills: | |
| return [] | |
| resume_l = (resume_text or "").lower() | |
| out = [] | |
| for s in missing_skills: | |
| if not isinstance(s, str) or not s.strip(): | |
| continue | |
| key = s.strip().lower() | |
| signals = implied_map.get(key) | |
| if not signals: | |
| continue | |
| hits = [sig for sig in signals if sig in resume_l] | |
| if len(hits) >= min_hits: | |
| out.append({ | |
| "skill": key, | |
| "signals": hits[:3], # show top 3 hits for explainability | |
| "hit_count": len(hits) | |
| }) | |
| return out | |
| def anonymize_resume_text(text: str): | |
| """ | |
| Heuristic redaction to remove common personal identifiers from resumes | |
| (email, phone, URLs, addresses, demographic fields, and likely name header). | |
| Returns: (sanitized_text, redaction_notes_list) | |
| """ | |
| redactions = [] | |
| sanitized = text | |
| # Email addresses | |
| sanitized2 = re.sub(r'[\w\.-]+@[\w\.-]+\.\w+', '[REDACTED_EMAIL]', sanitized) | |
| if sanitized2 != sanitized: | |
| redactions.append("Email addresses removed") | |
| sanitized = sanitized2 | |
| # Phone numbers (broad heuristic) | |
| sanitized2 = re.sub(r'(\+?\d[\d\-\(\)\s]{7,}\d)', '[REDACTED_PHONE]', sanitized) | |
| if sanitized2 != sanitized: | |
| redactions.append("Phone numbers removed") | |
| sanitized = sanitized2 | |
| # URLs | |
| sanitized2 = re.sub(r'(https?://\S+|www\.\S+)', '[REDACTED_URL]', sanitized) | |
| if sanitized2 != sanitized: | |
| redactions.append("URLs removed") | |
| sanitized = sanitized2 | |
| # Physical addresses (heuristic) | |
| address_patterns = [ | |
| r'\b\d{1,6}\s+\w+(?:\s+\w+){0,4}\s+(Street|St|Avenue|Ave|Road|Rd|Boulevard|Blvd|Lane|Ln|Drive|Dr|Court|Ct|Way|Parkway|Pkwy)\b\.?', | |
| r'\b(Apt|Apartment|Unit|Suite|Ste)\s*#?\s*\w+\b', | |
| r'\b\d{5}(?:-\d{4})?\b' # US ZIP | |
| ] | |
| for pat in address_patterns: | |
| sanitized2 = re.sub(pat, '[REDACTED_ADDRESS]', sanitized, flags=re.IGNORECASE) | |
| if sanitized2 != sanitized: | |
| redactions.append("Address/location identifiers removed") | |
| sanitized = sanitized2 | |
| # Explicit demographic fields | |
| demographic_patterns = [ | |
| r'\b(gender|sex)\s*:\s*\w+\b', | |
| r'\b(age)\s*:\s*\d+\b', | |
| r'\b(dob|date of birth)\s*:\s*[\w\s,/-]+\b', | |
| r'\b(marital status)\s*:\s*\w+\b', | |
| r'\b(nationality)\s*:\s*\w+\b', | |
| r'\b(citizenship)\s*:\s*[\w\s,/-]+\b', | |
| r'\b(pronouns?)\s*:\s*[\w/]+\b', | |
| ] | |
| for pat in demographic_patterns: | |
| sanitized2 = re.sub(pat, '[REDACTED_DEMOGRAPHIC]', sanitized, flags=re.IGNORECASE) | |
| if sanitized2 != sanitized: | |
| redactions.append("Explicit demographic fields removed") | |
| sanitized = sanitized2 | |
| # Likely name header masking (first line) | |
| lines = sanitized.splitlines() | |
| if lines: | |
| first_line = lines[0].strip() | |
| if re.fullmatch(r"[A-Za-z]+(?:\s+[A-Za-z]+){1,3}", first_line): | |
| lines[0] = "[REDACTED_NAME]" | |
| sanitized = "\n".join(lines) | |
| redactions.append("Likely name header removed") | |
| # Cleanup | |
| sanitized = re.sub(r'\n{3,}', '\n\n', sanitized).strip() | |
| redactions = sorted(set(redactions)) | |
| return sanitized, redactions | |
| def join_loaded_docs_text(docs): | |
| """Combine a list of LangChain Documents into a single text blob.""" | |
| return "\n".join([d.page_content for d in docs if getattr(d, "page_content", None)]) | |
| def process_candidate_submission(resume_file, job_description: str) -> str: | |
| # Load and process resume | |
| if resume_file.name.endswith('.pdf'): | |
| loader = PyPDFLoader(resume_file.name) | |
| else: | |
| loader = UnstructuredFileLoader(resume_file.name) | |
| resume_doc = loader.load()[0] | |
| sanitized_resume_text, _ = anonymize_resume_text(resume_doc.page_content) | |
| # Create proper prompt template | |
| prompt_template = PromptTemplate( | |
| input_variables=["resume_text", "job_description"], | |
| template=""" | |
| Given the following resume and job description, create a professional cold email to the candidate: | |
| Resume: | |
| {resume_text} | |
| Job Description: | |
| {job_description} | |
| Generate a concise, compelling cold email to the candidate that highlights the candidate's relevant skills and experience, how they align with the job requirements and company. Include a strong call-to-action. | |
| Ensure the email is well-structured, error-free, and tailored to the specific candidate and job description. Do not include any text apart from the email content. | |
| """ | |
| ) | |
| chain = LLMChain( | |
| llm=llm, | |
| prompt=prompt_template | |
| ) | |
| response = chain.run({ | |
| "resume_text": sanitized_resume_text, | |
| "job_description": job_description | |
| }) | |
| return response | |
| def store_culture_docs(culture_files: List[tempfile._TemporaryFileWrapper]) -> str: | |
| """Store company culture documentation in the vector store""" | |
| text_splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=500, | |
| chunk_overlap=100 | |
| ) | |
| all_docs = [] | |
| for file in culture_files: | |
| path = file if isinstance(file, str) else file.name | |
| if path.endswith('.pdf'): | |
| loader = PyPDFLoader(path) | |
| else: | |
| loader = UnstructuredFileLoader(path) | |
| docs = loader.load() | |
| splits = text_splitter.split_documents(docs) | |
| all_docs.extend(splits) | |
| culture_store.add_documents(all_docs) | |
| return f"Successfully stored {len(all_docs)} culture document chunks" | |
| def store_resumes(resume_files: List[tempfile._TemporaryFileWrapper]) -> str: | |
| """Store resumes in the vector store with proper metadata""" | |
| text_splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=500, | |
| chunk_overlap=100 | |
| ) | |
| all_docs = [] | |
| for file in resume_files: | |
| path = file if isinstance(file, str) else file.name | |
| if path.endswith('.pdf'): | |
| loader = PyPDFLoader(path) | |
| else: | |
| loader = UnstructuredFileLoader(path) | |
| docs = loader.load() | |
| raw_text = join_loaded_docs_text(docs) | |
| sanitized_text, redactions = anonymize_resume_text(raw_text) | |
| from langchain.schema import Document | |
| base_doc = Document(page_content=sanitized_text, metadata={}) | |
| resume_id = os.path.splitext(os.path.basename(path))[0] | |
| # Add metadata to each chunk | |
| splits = text_splitter.split_documents([base_doc]) | |
| for split in splits: | |
| split.metadata["resume_id"] = resume_id | |
| split.metadata["source"] = "resume" | |
| split.metadata["sanitized"] = True | |
| all_docs.extend(splits) | |
| resume_store.add_documents(all_docs) | |
| return f"Successfully stored {len(resume_files)} resumes" | |
| def verify_analysis(analysis_text: str, source_documents: List[str]) -> Dict: | |
| verification_prompt = PromptTemplate( | |
| input_variables=["analysis", "source_docs"], | |
| template=""" | |
| You are a fact-checker. Compare the analysis below against the source documents. | |
| Analysis: | |
| {analysis} | |
| Source documents: | |
| {source_docs} | |
| Rules: | |
| - Ignore any computed score lines and labels. | |
| - Ignore missing lists entirely (for example lines under "Missing skills" or "Missing attributes"). | |
| - Verify only evidence-backed match lines that include a colon, for example: "- Python: <evidence snippet>". | |
| - If the evidence snippet (or very close text) is present in the sources, mark β. Otherwise mark β. | |
| - Do NOT output any numeric score. | |
| Output format exactly: | |
| VERIFIED CLAIMS: | |
| β <match line> | |
| β <match line> | |
| """ | |
| ) | |
| # Only verify evidence-backed lines like "- skill: snippet" or "- attribute: snippet" | |
| evidence_lines = [] | |
| for line in analysis_text.splitlines(): | |
| s = line.strip() | |
| if s.startswith("- ") and (":" in s): | |
| evidence_lines.append(s) | |
| analysis_for_verification = "\n".join(evidence_lines) | |
| chain = LLMChain(llm=llm, prompt=verification_prompt) | |
| result = chain.run({ | |
| "analysis": analysis_for_verification, | |
| "source_docs": "\n---\n".join(source_documents) | |
| }) | |
| verified_claims = re.findall(r'^\s*β\s*(.+)$', result, flags=re.M) | |
| unverified_claims = re.findall(r'^\s*β\s*(.+)$', result, flags=re.M) | |
| total = len(verified_claims) + len(unverified_claims) | |
| factuality_score = 1.0 if total == 0 else (len(verified_claims) / total) | |
| return { | |
| "factuality_score": factuality_score, | |
| "verified_claims": verified_claims, | |
| "unverified_claims": unverified_claims, | |
| "verification_result": result | |
| } | |
| def verify_required_skills(req_items: list, job_description: str) -> dict: | |
| jd_lower = job_description.lower() | |
| quote_verified = [] | |
| name_only_verified = [] | |
| unverified = [] | |
| for it in req_items: | |
| skill = (it.get("skill") or "").strip() | |
| evidence = (it.get("evidence") or "").strip() | |
| skill_l = skill.lower() | |
| evidence_l = evidence.lower() | |
| # 1) Quote-verified (strong) | |
| if evidence and evidence_l in jd_lower: | |
| it2 = dict(it) | |
| it2["verification_method"] = "quote" | |
| quote_verified.append(it2) | |
| continue | |
| # 2) Name-only verified (weak) | |
| if skill: | |
| if len(skill_l) <= 3: | |
| found = re.search(rf"(^|[^a-z0-9]){re.escape(skill_l)}([^a-z0-9]|$)", jd_lower) is not None | |
| else: | |
| found = skill_l in jd_lower | |
| if found: | |
| it2 = dict(it) | |
| it2["verification_method"] = "name_only" | |
| name_only_verified.append(it2) | |
| continue | |
| # 3) Unverified (drop) | |
| it2 = dict(it) | |
| it2["verification_method"] = "unverified" | |
| unverified.append(it2) | |
| total = len(quote_verified) + len(name_only_verified) + len(unverified) | |
| verified_total = len(quote_verified) + len(name_only_verified) | |
| factuality = 1.0 if total == 0 else (verified_total / total) | |
| return { | |
| "factuality_score": factuality, | |
| "quote_verified": quote_verified, | |
| "name_only_verified": name_only_verified, | |
| "unverified": unverified, | |
| "total": total, | |
| "quote_verified_count": len(quote_verified), | |
| "name_only_verified_count": len(name_only_verified), | |
| "unverified_count": len(unverified), | |
| "verified_count": verified_total, | |
| } | |
| def self_correct_recommendation( | |
| original_recommendation: str, | |
| verification_issues: List[str], | |
| source_docs: List[str], | |
| skills_score: int, | |
| culture_score: int | |
| ) -> str: | |
| """Have LLM revise its recommendation based on verification feedback, without changing fixed scores/policy.""" | |
| correction_prompt = PromptTemplate( | |
| input_variables=["original_rec", "issues", "source_docs", "skills_score", "culture_score"], | |
| template=""" | |
| Your original hiring recommendation contained some unverified claims. Revise it using ONLY the evidence in the source documents. | |
| Original Recommendation: | |
| {original_rec} | |
| Unverified Claims To Fix (remove or correct these ONLY): | |
| {issues} | |
| Source Documents: | |
| {source_docs} | |
| Fixed Scores (do not change or reinterpret): | |
| - Skills score: {skills_score}% | |
| - Culture score: {culture_score}% | |
| Decision Policy (must remain unchanged): | |
| 1) If skills_score >= 70 -> Decision = PROCEED | |
| 2) If skills_score < 60 -> Decision = DO NOT PROCEED | |
| 3) If 60 <= skills_score < 70 -> Decision = PROCEED only if culture_score >= 70, else DO NOT PROCEED | |
| Rules: | |
| - Do NOT introduce any new claims beyond what was already present. | |
| - Do NOT change the decision policy or the fixed scores. | |
| - Only remove or correct the unverified claims listed. | |
| - Keep the exact output format. | |
| Output format (exact): | |
| FINAL HIRING RECOMMENDATION: | |
| Decision: PROCEED or DO NOT PROCEED | |
| Rationale: | |
| - Skills: 1-2 bullets referencing only the provided skills analysis and the fixed skills score. | |
| - Culture: 1 bullet referencing only the provided culture analysis and the fixed culture score. | |
| - Risk/Gap: 1 bullet describing the biggest missing skill or biggest concern (must be present in analyses). | |
| Return ONLY the formatted block above. Do NOT include any other headers such as "REVISED". | |
| """ | |
| ) | |
| chain = LLMChain(llm=llm, prompt=correction_prompt) | |
| return chain.run({ | |
| "original_rec": original_recommendation, | |
| "issues": "\n".join(verification_issues), | |
| "source_docs": "\n---\n".join(source_docs), | |
| "skills_score": skills_score, | |
| "culture_score": culture_score | |
| }) | |
| bias_audit_prompt = PromptTemplate( | |
| input_variables=["skills_analysis", "culture_analysis", "final_recommendation", "job_desc", "culture_docs"], | |
| template=""" | |
| You are auditing for bias and fairness risks in a recruiting decision-support tool. | |
| You must evaluate reasoning in the FINAL RECOMMENDATION, in addition to the source documents. | |
| Important: | |
| - This tool computes skills_score and culture_score deterministically; do NOT debate scoring math or thresholds. | |
| - Your job is to identify biased criteria, biased reasoning, or proxy signals that could lead to unfair outcomes. | |
| - You must be strict: only flag issues if you can point to specific text in the inputs. | |
| INPUTS (treat as the only source of truth) | |
| JOB DESCRIPTION: | |
| {job_desc} | |
| CULTURE DOCUMENTS (context only): | |
| {culture_docs} | |
| SKILLS ANALYSIS (evidence snippets): | |
| {skills_analysis} | |
| CULTURE ANALYSIS (evidence snippets and inferences): | |
| {culture_analysis} | |
| FINAL RECOMMENDATION: | |
| {final_recommendation} | |
| AUDIT CHECKLIST (flag ONLY if present, and cite the exact phrase) | |
| A) Background / pedigree / class-based signals (must flag if used positively or negatively): | |
| - education brand, employer brand, "big-tech" preference, prestige language ("raise the bar", "top-tier") | |
| - socioeconomic proxies (unpaid internships, elite networks, expensive certifications) IF treated as merit | |
| B) Protected-class or proxy signals (must flag if referenced or implied): | |
| - age, gender, family status, nationality, race/ethnicity, disability, religion, citizenship/immigration status | |
| - proxies like "young/energetic", "native English", "cultural fit" used vaguely | |
| C) Non-job-related preferences treated as performance criteria: | |
| - location / remote/onsite/hybrid preferences, time zone, working hours availability | |
| D) Subjective or exclusionary culture reasoning: | |
| - vibe-based statements without job-performance linkage (e.g., "not a culture fit" without evidence) | |
| - penalizing personality traits (introverted/extroverted) or communication style without job relevance | |
| E) Inconsistent standards / moving goalposts: | |
| - holding candidate to skills/attributes not listed as required in the job description | |
| - treating nice-to-haves as must-haves | |
| F) Overclaiming certainty: | |
| - strong claims not supported by evidence snippets in the analyses | |
| OUTPUT FORMAT (exact JSON; no extra text): | |
| {{ | |
| "bias_indicators": [ | |
| {{ | |
| "category": "A|B|C|D|E|F", | |
| "severity": "low|medium|high", | |
| "trigger_text": "exact phrase you are reacting to", | |
| "why_it_matters": "1 sentence", | |
| "recommended_fix": "1 sentence rewrite or recruiter guidance" | |
| }} | |
| ], | |
| "overall_assessment": "none_detected|minor_concerns|material_risk", | |
| "recruiter_guidance": "2-4 sentences max; must be actionable" | |
| }} | |
| Rules: | |
| - If no issues: return bias_indicators as an empty list and overall_assessment="none_detected". If no issues are found, briefly state why the recommendation appears fair and job-related. | |
| - Do not invent issues. Every issue must cite trigger_text from the inputs. | |
| """ | |
| ) | |
| def run_bias_audit(skills_analysis, culture_analysis, final_recommendation, job_desc, culture_docs): | |
| chain = LLMChain(llm=llm, prompt=bias_audit_prompt) | |
| return chain.run({ | |
| "skills_analysis": skills_analysis, | |
| "culture_analysis": culture_analysis, | |
| "final_recommendation": final_recommendation, | |
| "job_desc": job_desc, | |
| "culture_docs": culture_docs | |
| }) | |
| implied_map_prompt = PromptTemplate( | |
| input_variables=["job_description", "verified_required_skills"], | |
| template=""" | |
| Return ONLY valid JSON. | |
| Goal: | |
| Create a correlation map used to suggest IMPLIED (NOT SCORED) competencies when a resume likely indicates a missing required skill. | |
| Inputs: | |
| - Job description | |
| - Verified required skills (quote-verified only) | |
| Rules: | |
| - This map is ONLY for recruiter follow-up suggestions. It MUST NOT be used for scoring. | |
| - Each key in correlations must be one of the verified required skills (lowercase). | |
| - Each value is a list of 2-5 SHORT signals (1-4 words each, lowercase). | |
| - Signals must be either: | |
| (a) other skills from the verified_required_skills list, OR | |
| (b) concrete technical phrases likely to appear in resumes (e.g., "vector database", "embeddings", "feature engineering"). | |
| - Do NOT include demographic, pedigree, prestige, education, employer-brand, or class-related signals. | |
| - Do NOT include location / remote / time zone signals. | |
| - Do NOT invent new skills beyond the verified required skills list. | |
| Schema: | |
| {{ | |
| "correlations": {{ | |
| "information retrieval": ["rag", "embeddings", "vector database"], | |
| "data mining": ["feature engineering", "predictive modeling", "large-scale datasets"] | |
| }} | |
| }} | |
| Job Description: | |
| {job_description} | |
| Verified Required Skills (quote-verified): | |
| {verified_required_skills} | |
| """ | |
| ) | |
| implied_competencies_prompt = PromptTemplate( | |
| input_variables=["missing_skills", "matched_skills", "resume_text", "job_description"], | |
| template=""" | |
| Return ONLY valid JSON. | |
| You are assisting a recruiter by suggesting IMPLIED (NOT SCORED) competencies. | |
| These suggestions are used ONLY to guide a phone screen and must NOT affect the skills score. | |
| Task: | |
| Given a candidate resume and the missing required skills, identify which missing skills may be reasonably implied by adjacent evidence. | |
| Rules: | |
| - Do NOT claim the candidate definitively has the missing skill. | |
| - Only mark a missing skill as implied if you can cite 1-2 verbatim resume quotes that strongly support adjacent competence. | |
| - Use semantic similarity: it does NOT need exact keyword matches. | |
| - Do NOT infer tool-specific skills (e.g., R, SAS, MATLAB) unless explicitly mentioned in the resume. | |
| - Output must be conservative: it is better to omit than to over-infer. | |
| - Provide 1 phone-screen validation question per implied skill. | |
| Schema: | |
| {{ | |
| "implied": [ | |
| {{ | |
| "skill": "information retrieval", | |
| "confidence": 1-5, | |
| "why_implied": "1 sentence; probabilistic wording", | |
| "resume_quotes": ["quote1", "quote2"], | |
| "phone_screen_question": "question" | |
| }} | |
| ] | |
| }} | |
| Job Description: | |
| {job_description} | |
| Matched Skills (already evidenced): | |
| {matched_skills} | |
| Missing Required Skills: | |
| {missing_skills} | |
| Resume: | |
| {resume_text} | |
| """ | |
| ) | |
| def infer_implied_competencies_llm( | |
| missing_skills: list, | |
| matched_skills: list, | |
| resume_text: str, | |
| job_description: str | |
| ) -> list: | |
| """ | |
| Use LLM to suggest implied (NOT SCORED) competencies based on adjacent evidence. | |
| Output is conservative and intended only for recruiter phone-screen follow-up. | |
| """ | |
| # Guardrails | |
| if not missing_skills or not resume_text: | |
| return [] | |
| # Extract clean matched skill names | |
| matched_skill_names = [] | |
| for m in matched_skills or []: | |
| if isinstance(m, dict) and m.get("skill"): | |
| s = m.get("skill").strip() | |
| if s: | |
| matched_skill_names.append(s) | |
| matched_skill_names = list(dict.fromkeys(matched_skill_names)) # de-dup | |
| # Prepare prompt input | |
| missing_str = ", ".join([s.strip() for s in missing_skills if isinstance(s, str) and s.strip()]) | |
| matched_str = ", ".join(matched_skill_names) | |
| if not missing_str: | |
| return [] | |
| # Run LLM | |
| chain = LLMChain(llm=llm, prompt=implied_competencies_prompt) | |
| try: | |
| raw = chain.run({ | |
| "missing_skills": missing_str, | |
| "matched_skills": matched_str, | |
| "resume_text": resume_text, | |
| "job_description": job_description | |
| }) | |
| except Exception: | |
| return [] | |
| # Parse JSON safely | |
| obj = _safe_parse_json(raw) or {} | |
| implied = obj.get("implied") or [] | |
| if not isinstance(implied, list): | |
| return [] | |
| resume_l = resume_text.lower() | |
| missing_set = set(s.lower() for s in missing_skills if isinstance(s, str)) | |
| out = [] | |
| for it in implied: | |
| if not isinstance(it, dict): | |
| continue | |
| skill = (it.get("skill") or "").strip().lower() | |
| confidence = it.get("confidence") | |
| why = (it.get("why_implied") or "").strip() | |
| quotes = it.get("resume_quotes") or [] | |
| phone_q = (it.get("phone_screen_question") or "").strip() | |
| # Must correspond to a missing required skill | |
| if not skill or skill not in missing_set: | |
| continue | |
| # Confidence must be 1-5 | |
| if not isinstance(confidence, int) or confidence < 1 or confidence > 5: | |
| continue | |
| # Explanation must be meaningful | |
| if len(why) < 15: | |
| continue | |
| # Must include at least one substantial verbatim quote | |
| if not isinstance(quotes, list): | |
| continue | |
| valid_quotes = [] | |
| for q in quotes: | |
| if isinstance(q, str): | |
| qs = q.strip() | |
| if len(qs) >= 25 and qs.lower() in resume_l: | |
| valid_quotes.append(qs) | |
| if not valid_quotes: | |
| continue | |
| # Must include usable phone-screen question | |
| if len(phone_q) < 15: | |
| continue | |
| # Extra guardrail: tool-like skills require explicit mention | |
| if skill in {"r", "sas", "matlab"}: | |
| if re.search(rf"(^|[^a-z0-9]){re.escape(skill)}([^a-z0-9]|$)", resume_l) is None: | |
| continue | |
| # All checks passed β keep | |
| out.append({ | |
| "skill": skill, | |
| "confidence": confidence, | |
| "why_implied": why, | |
| "resume_quotes": valid_quotes[:2], | |
| "phone_screen_question": phone_q | |
| }) | |
| return out | |
| def analyze_candidates(job_description: str) -> str: | |
| # First extract required skills from job description | |
| skills_prompt = PromptTemplate( | |
| input_variables=["job_description"], | |
| template=""" | |
| Return ONLY valid JSON. | |
| Task: | |
| Extract ONLY the REQUIRED technical skills explicitly stated as requirements in the job description. | |
| Do NOT include nice-to-haves, examples, or implied skills. | |
| For each required skill, include a short evidence quote copied from the job description that proves it is required. | |
| Schema: | |
| {{ | |
| "required_skills": [ | |
| {{"skill": "python", "evidence": "quote from JD"}}, | |
| {{"skill": "kubernetes", "evidence": "quote from JD"}} | |
| ] | |
| }} | |
| Job Description: | |
| {job_description} | |
| """ | |
| ) | |
| skills_chain = LLMChain( | |
| llm=llm, | |
| prompt=skills_prompt | |
| ) | |
| skills_raw = skills_chain.run({"job_description": job_description}) | |
| skills_json = _safe_parse_json(skills_raw) | |
| if not skills_json or "required_skills" not in skills_json: | |
| return ( | |
| "ERROR: Could not parse required_skills JSON from LLM output.\n\n" | |
| f"RAW OUTPUT:\n{skills_raw}" | |
| ) | |
| required_items = skills_json.get("required_skills", []) or [] | |
| # normalize into list of dicts: {"skill": "...", "evidence": "..."} | |
| req = [] | |
| for it in required_items: | |
| if isinstance(it, dict): | |
| skill = (it.get("skill") or "").strip() | |
| evidence = (it.get("evidence") or "").strip() | |
| if skill: | |
| req.append({"skill": skill, "evidence": evidence}) | |
| elif isinstance(it, str): | |
| # backwards compatibility if model returns strings | |
| skill = it.strip() | |
| if skill: | |
| req.append({"skill": skill, "evidence": ""}) | |
| # de-dup skills (preserve order) | |
| seen = set() | |
| req_dedup = [] | |
| for it in req: | |
| key = it["skill"].strip().lower() | |
| if key in seen: | |
| continue | |
| seen.add(key) | |
| req_dedup.append(it) | |
| req = req_dedup | |
| req_verif = verify_required_skills(req, job_description) | |
| # Use only VERIFIED requirements for scoring downstream | |
| verified_required_skills = [it["skill"] for it in req_verif["quote_verified"]] | |
| # Build implied-skill correlation map ONCE per JD (not scored) | |
| implied_map = build_implied_skill_map(job_description, verified_required_skills) | |
| # Get relevant culture documents based on job description | |
| relevant_culture_docs = culture_store.similarity_search( | |
| job_description, # Using job description to find relevant culture aspects | |
| k=3 # Adjust based on how many culture chunks you want to consider | |
| ) | |
| culture_context = "\n".join([doc.page_content for doc in relevant_culture_docs]) | |
| # First analyze what cultural aspects we're looking for based on the role | |
| culture_requirements_prompt = PromptTemplate( | |
| input_variables=["job_description", "culture_docs"], | |
| template=""" | |
| Return ONLY valid JSON. | |
| Task: | |
| From the culture_docs, output 4-6 CULTURE ATTRIBUTES that are most meaningful for job performance in this role. | |
| Definition: | |
| - A culture attribute is an observable work behavior that predicts success on the job. | |
| - It must be assessable through work outputs and outcomes (projects, incidents, audits, decisions, reliability). | |
| - It must be relevant to the job_description. | |
| Hard exclusions (do NOT output these): | |
| - work location or mode (remote-first, onsite, hybrid) | |
| - time zone, availability, working hours | |
| - personality traits or "vibe" descriptors (e.g., friendly, energetic, extroverted) | |
| - demographic or personal background details | |
| - education, employer history, or any background-based proxies | |
| - non-job-related preferences (e.g., office attendance) | |
| Output requirements: | |
| - Each attribute must be 2-6 words, lowercase. | |
| - Use generic labels (no slogans, no internal code names). | |
| - Avoid near-duplicates. | |
| Schema: | |
| {{ | |
| "cultural_attributes": ["attr1", "attr2", "attr3", "attr4"] | |
| }} | |
| Job Description: | |
| {job_description} | |
| Culture Context: | |
| {culture_docs} | |
| """ | |
| ) | |
| culture_req_chain = LLMChain( | |
| llm=llm, | |
| prompt=culture_requirements_prompt | |
| ) | |
| culture_req_raw = culture_req_chain.run({ | |
| "job_description": job_description, | |
| "culture_docs": culture_context | |
| }) | |
| culture_req_json = _parse_json_or_raise(culture_req_raw) | |
| cultural_attributes = culture_req_json.get("cultural_attributes", []) | |
| cultural_attributes = [c.strip() for c in cultural_attributes if isinstance(c, str) and c.strip()] | |
| # Query resumes | |
| results = resume_store.similarity_search( | |
| job_description, | |
| k=10 | |
| ) | |
| # Group resume chunks by resume_id | |
| resume_groups = {} | |
| for doc in results: | |
| resume_id = doc.metadata.get("resume_id") | |
| if resume_id not in resume_groups: | |
| resume_groups[resume_id] = [] | |
| resume_groups[resume_id].append(doc.page_content) | |
| # For each resume, compare against culture docs | |
| consolidated_analyses = [] # Initialize empty list for all analyses | |
| requirements_debug = [] | |
| requirements_debug.append("=== REQUIREMENTS VERIFICATION (Job Description) ===") | |
| requirements_debug.append(f"Extracted required skills (LLM): {len(req)}") | |
| requirements_debug.append( | |
| f"Verified required skills: {req_verif['verified_count']} / {req_verif['total']} (score={req_verif['factuality_score']:.2f})" | |
| ) | |
| requirements_debug.append(f"- Quote-verified (strong): {req_verif['quote_verified_count']}") | |
| requirements_debug.append(f"- Name-only verified (weak): {req_verif['name_only_verified_count']}") | |
| requirements_debug.append(f"- Retracted / Dropped (unverified): {req_verif['unverified_count']}") | |
| if req_verif["unverified"]: | |
| requirements_debug.append("\nRETRACTED REQUIREMENTS (LLM claimed required, JD did not support):") | |
| for it in req_verif["unverified"][:20]: | |
| s = it.get("skill", "") | |
| ev = it.get("evidence", "") | |
| vm = it.get("verification_method", "unverified") | |
| if ev: | |
| requirements_debug.append(f"- {s} | verification_method={vm} | quote not found: {ev}") | |
| else: | |
| requirements_debug.append(f"- {s} | verification_method={vm} | no evidence quote provided") | |
| requirements_debug.append("\nVERIFIED REQUIRED SKILLS USED FOR SCORING (quote-only):") | |
| for it in req_verif["quote_verified"]: | |
| requirements_debug.append(f"- {it.get('skill','')}") | |
| requirements_debug.append("\nWEAKLY VERIFIED (name-only, not used for scoring):") | |
| for it in req_verif["name_only_verified"]: | |
| requirements_debug.append(f"- {it.get('skill','')} | evidence quote missing or not found") | |
| requirements_debug.append("\nIMPLIED COMPETENCIES: enabled (not scored; resume-triggered only)") | |
| consolidated_analyses.append("\n".join(requirements_debug)) | |
| for resume_id, chunks in resume_groups.items(): | |
| resume_text = "\n".join(chunks) | |
| # Compare this specific resume against culture docs | |
| culture_analysis_prompt = PromptTemplate( | |
| input_variables=["resume", "cultural_attributes"], | |
| template=""" | |
| Return ONLY valid JSON. | |
| Task: | |
| For each cultural attribute, determine if it is supported by the resume using either: | |
| - DIRECT evidence: a verbatim quote describing a concrete behavior/outcome | |
| - INFERRED evidence: a short paraphrase based on 1-2 specific resume bullets (you must cite those bullets verbatim) | |
| Rules: | |
| - Do NOT match an attribute based only on a job title, years of experience, or generic self-claims (e.g., "data-driven"). | |
| - Prefer DIRECT evidence when possible. | |
| - INFERRED evidence is allowed only if you cite at least 1 verbatim bullet from the resume that clearly implies the attribute. | |
| - For each matched attribute, include: | |
| - evidence_type: "direct" or "inferred" | |
| - evidence_quotes: 1-2 verbatim quotes from the resume (must be copied exactly) | |
| - inference: a one-sentence explanation (required only when evidence_type="inferred") | |
| - confidence: integer 1-5 (5 = very strong, 1 = weak) | |
| Schema: | |
| {{ | |
| "matched": [ | |
| {{ | |
| "attribute": "attr", | |
| "evidence_type": "direct_or_inferred", | |
| "evidence_quotes": ["quote1", "quote2"], | |
| "inference": "one sentence", | |
| "confidence": 1 | |
| }} | |
| ] | |
| }} | |
| Resume: | |
| {resume} | |
| Cultural Attributes: | |
| {cultural_attributes} | |
| """ | |
| ) | |
| culture_chain = LLMChain( | |
| llm=llm, | |
| prompt=culture_analysis_prompt | |
| ) | |
| try: | |
| culture_fit_raw = culture_chain.run({ | |
| "resume": resume_text, | |
| "cultural_attributes": cultural_attributes | |
| }) | |
| culture_fit_json = _parse_json_or_raise(culture_fit_raw) | |
| matched_culture_raw = culture_fit_json.get("matched", []) or [] | |
| # Deterministic cleanup: keep only well-formed matches with quotes + reasonable confidence | |
| matched_culture = [] | |
| for m in matched_culture_raw: | |
| if not isinstance(m, dict): | |
| continue | |
| attr = (m.get("attribute") or "").strip() | |
| evidence_type = (m.get("evidence_type") or "").strip().lower() | |
| quotes = m.get("evidence_quotes") or [] | |
| inference = (m.get("inference") or "").strip() | |
| confidence = m.get("confidence") | |
| if not attr: | |
| continue | |
| if evidence_type not in {"direct", "inferred"}: | |
| continue | |
| if not isinstance(quotes, list) or len(quotes) == 0: | |
| continue | |
| # require at least one non-trivial quote | |
| quotes = [q.strip() for q in quotes if isinstance(q, str) and len(q.strip()) >= 20] | |
| if not quotes: | |
| continue | |
| # confidence must be int 1-5 | |
| if not isinstance(confidence, int) or confidence < 1 or confidence > 5: | |
| continue | |
| # inferred requires an inference sentence | |
| if evidence_type == "inferred" and len(inference) < 10: | |
| continue | |
| # Optional: enforce minimum confidence for inferred matches | |
| if evidence_type == "inferred" and confidence < 3: | |
| continue | |
| matched_culture.append({ | |
| "attribute": attr, | |
| "evidence_type": evidence_type, | |
| "evidence_quotes": quotes[:2], | |
| "inference": inference, | |
| "confidence": confidence | |
| }) | |
| # Compute missing deterministically (required - matched) | |
| matched_attr_names = set(a["attribute"].strip().lower() for a in matched_culture) | |
| required_attr_names = set(c.strip().lower() for c in cultural_attributes) | |
| missing_culture = sorted(list(required_attr_names - matched_attr_names)) | |
| # compute deterministic culture score + label | |
| culture_score = score_culture_weighted(cultural_attributes, matched_culture) | |
| culture_label = label_from_score(culture_score) | |
| culture_lines = [] | |
| culture_lines.append(f"Culture evidence coverage: {culture_score}% ({culture_label})") | |
| culture_lines.append("Scoring weights:") | |
| culture_lines.append("- direct evidence = 1.0, inferred evidence = 0.5") | |
| culture_lines.append("Matched attributes:") | |
| for m in matched_culture[:6]: | |
| attr = m.get("attribute", "") | |
| et = m.get("evidence_type", "") | |
| conf = m.get("confidence", "") | |
| q1 = (m.get("evidence_quotes") or [""])[0] | |
| inf = m.get("inference", "") | |
| if et == "direct": | |
| culture_lines.append(f"- {attr} (direct, conf {conf}): {q1}") | |
| else: | |
| culture_lines.append(f"- {attr} (inferred, conf {conf}): {inf} | Quote: {q1}") | |
| culture_lines.append(f"Missing attributes ({len(missing_culture)}):") | |
| for a in missing_culture: | |
| culture_lines.append(f"- {a}") | |
| culture_fit = "\n".join(culture_lines) | |
| # Verify culture analysis | |
| culture_verification = verify_analysis( | |
| culture_fit, | |
| [resume_text, culture_context, "\n".join(cultural_attributes)] | |
| ) | |
| # Now analyze technical skills match | |
| skills_analysis_prompt = PromptTemplate( | |
| input_variables=["resume", "required_skills", "job_description"], | |
| template=""" | |
| Return ONLY valid JSON. | |
| Given the resume and required skills list, determine which required skills are explicitly supported by evidence in the resume. | |
| Only mark a skill as matched if you can cite a short evidence snippet from the resume. | |
| Schema: | |
| {{ | |
| "matched": [{{"skill": "skill", "evidence": "snippet"}}], | |
| "missing": ["skill1", "skill2"] | |
| }} | |
| Resume: | |
| {resume} | |
| Required Skills: | |
| {required_skills} | |
| Job Description: | |
| {job_description} | |
| """ | |
| ) | |
| skills_chain = LLMChain( | |
| llm=llm, | |
| prompt=skills_analysis_prompt | |
| ) | |
| skills_fit_raw = skills_chain.run({ | |
| "resume": resume_text, | |
| "required_skills": verified_required_skills, | |
| "job_description": job_description | |
| }) | |
| skills_fit_json = _parse_json_or_raise(skills_fit_raw) | |
| matched_skills = skills_fit_json.get("matched", []) or [] | |
| matched_skill_names = set([(m.get("skill","").strip().lower()) for m in matched_skills if isinstance(m, dict) and m.get("skill")]) | |
| required_skill_names = set([s.strip().lower() for s in verified_required_skills]) | |
| missing_skills = sorted(list(required_skill_names - matched_skill_names)) | |
| implied_skills = infer_implied_competencies_llm( | |
| missing_skills=missing_skills, | |
| matched_skills=matched_skills, | |
| resume_text=resume_text, | |
| job_description=job_description | |
| ) | |
| skills_score = score_from_required_list(verified_required_skills, matched_skills) | |
| skills_label = label_from_score(skills_score) | |
| implied_block = "" | |
| if implied_skills: | |
| lines = [] | |
| lines.append("\nIMPLIED (NOT SCORED) β PHONE SCREEN FOLLOW-UPS:") | |
| for it in implied_skills[:5]: | |
| sk = it.get("skill", "") | |
| conf = it.get("confidence", "") | |
| why = it.get("why_implied", "") | |
| quotes = it.get("resume_quotes") or [] | |
| q1 = quotes[0] if quotes else "" | |
| phone_q = it.get("phone_screen_question", "") | |
| lines.append(f"- {sk} (confidence {conf}/5): {why}") | |
| if q1: | |
| lines.append(f" Evidence: {q1}") | |
| if phone_q: | |
| lines.append(f" Phone screen: {phone_q}") | |
| lines.append( | |
| "\nRecruiter note: These are probabilistic suggestions based on adjacent evidence and were NOT counted in the score. " | |
| "If the candidate proceeds to a recruiter phone screen, validate these areas directly." | |
| ) | |
| implied_block = "\n".join(lines) | |
| skills_fit = ( | |
| f"Skills match score: {skills_score}% ({skills_label})\n" | |
| f"Matched skills:\n" + | |
| "\n".join([f"- {m.get('skill')}: {m.get('evidence','')}" for m in matched_skills[:12]]) + | |
| f"\nMissing skills ({len(missing_skills)}):\n" + | |
| "\n".join([f"- {s}" for s in missing_skills]) + | |
| implied_block | |
| ) | |
| # Verify skills analysis | |
| skills_verification = verify_analysis( | |
| skills_fit,[resume_text, job_description, "\n".join(verified_required_skills)] | |
| ) | |
| # Create final recommendation | |
| final_recommendation_prompt = PromptTemplate( | |
| input_variables=["skills_analysis", "culture_analysis", "job_description", "skills_score", "culture_score"], | |
| template=""" | |
| You are an evaluator. Make a final hiring recommendation using ONLY the inputs below. | |
| IMPORTANT: | |
| - The scores are FIXED inputs computed by code. Do NOT change, reinterpret, or re-score them. | |
| - Do NOT introduce new claims not present in the analyses. | |
| Job Description: | |
| {job_description} | |
| Technical Skills Analysis (includes evidence snippets): | |
| {skills_analysis} | |
| Culture Fit Analysis (includes evidence snippets): | |
| {culture_analysis} | |
| Fixed Scores (do not modify): | |
| - Skills score: {skills_score}% | |
| - Culture score: {culture_score}% | |
| Decision Policy (apply exactly): | |
| 1) If skills_score >= 70 -> Decision = PROCEED | |
| 2) If skills_score < 60 -> Decision = DO NOT PROCEED | |
| 3) If 60 <= skills_score < 70 -> Decision = PROCEED only if culture_score >= 70, else DO NOT PROCEED | |
| For senior technical roles, prioritize skills over culture (policy already reflects this). | |
| Output format (exact): | |
| FINAL HIRING RECOMMENDATION: | |
| Decision must be exactly one of: PROCEED or DO NOT PROCEED | |
| Rationale: | |
| - Skills: 1-2 bullets referencing only the provided skills analysis and the fixed skills score. | |
| - Culture: 1 bullet referencing only the provided culture analysis and the fixed culture score. | |
| - Risk/Gap: 1 bullet describing the biggest missing skill or biggest concern (must be present in analyses). | |
| Keep the entire response under 200 words. | |
| """ | |
| ) | |
| recommendation_chain = LLMChain( | |
| llm=llm, | |
| prompt=final_recommendation_prompt | |
| ) | |
| final_recommendation = recommendation_chain.run({ | |
| "skills_analysis": skills_fit, | |
| "culture_analysis": culture_fit, | |
| "job_description": job_description, | |
| "skills_score": skills_score, | |
| "culture_score": culture_score, | |
| }) | |
| # Collect all unverified claims for potential self-correction | |
| all_issues = (culture_verification.get("unverified_claims") or []) + (skills_verification.get("unverified_claims") or []) | |
| # Self-correct if there are any unverified claims (deterministic trigger) | |
| if all_issues: | |
| corrected_recommendation = self_correct_recommendation( | |
| original_recommendation=final_recommendation, | |
| verification_issues=all_issues, | |
| source_docs=[resume_text, job_description, culture_context], | |
| skills_score=skills_score, | |
| culture_score=culture_score | |
| ) | |
| final_recommendation = corrected_recommendation | |
| revision_note = "\n\nπ RECOMMENDATION REVISED: Removed/corrected unverified claims" | |
| else: | |
| revision_note = "" | |
| # Bias audit (triangulates across skills, culture, and final recommendation) | |
| bias_audit_raw = run_bias_audit( | |
| skills_analysis=skills_fit, | |
| culture_analysis=culture_fit, | |
| final_recommendation=final_recommendation, | |
| job_desc=job_description, | |
| culture_docs=culture_context | |
| ) | |
| # Try to parse and pretty-print JSON for cleaner output | |
| try: | |
| bias_obj = _parse_json_or_raise(bias_audit_raw) | |
| bias_audit = json.dumps(bias_obj, indent=2, ensure_ascii=False) | |
| except Exception: | |
| # Fallback: show raw output if parsing fails | |
| bias_audit = bias_audit_raw | |
| # Add verification warnings if factuality score < 0.95 | |
| # Add factuality + verification info | |
| verification_notes = f""" | |
| π FACT CHECK RESULTS | |
| - Culture factuality score: {culture_verification["factuality_score"]:.2f} | |
| - Skills factuality score: {skills_verification["factuality_score"]:.2f} | |
| """ | |
| if all_issues: | |
| verification_notes += "\nUnverified claims detected:" | |
| if culture_verification.get("unverified_claims"): | |
| verification_notes += "\n\nCULTURE ANALYSIS - Unverified claims:" | |
| for claim in culture_verification["unverified_claims"][:3]: | |
| verification_notes += f"\nβ {claim}" | |
| if skills_verification.get("unverified_claims"): | |
| verification_notes += "\n\nSKILLS ANALYSIS - Unverified claims:" | |
| for claim in skills_verification["unverified_claims"][:3]: | |
| verification_notes += f"\nβ {claim}" | |
| # Append the analysis for this candidate to the consolidated analyses | |
| consolidated_analyses.append(f""" | |
| === Candidate Analysis (Resume ID: {resume_id}) === | |
| CULTURE FIT ANALYSIS: | |
| {culture_fit} | |
| TECHNICAL SKILLS ANALYSIS: | |
| {skills_fit} | |
| {final_recommendation}{revision_note}{verification_notes} | |
| BIAS AUDIT: | |
| {bias_audit} | |
| ---------------------------------------- | |
| """) | |
| except Exception as e: | |
| # If there's an error analyzing this candidate, add error message but continue with others | |
| consolidated_analyses.append(f""" | |
| === Candidate Analysis (Resume ID: {resume_id}) === | |
| Error analyzing candidate: {str(e)} | |
| ---------------------------------------- | |
| """) | |
| continue | |
| # Return all analyses joined together | |
| return "\n".join(consolidated_analyses) | |
| def clear_databases(): | |
| """Clear both resume and culture document databases""" | |
| global resume_store, culture_store | |
| status_messages = [] | |
| # Clear resume store | |
| try: | |
| results = resume_store.get() | |
| if results and results['ids']: | |
| num_docs = len(results['ids']) | |
| resume_store._collection.delete( | |
| ids=results['ids'] | |
| ) | |
| status_messages.append(f"Cleared {num_docs} documents from resume database") | |
| else: | |
| status_messages.append("Resume database was already empty") | |
| except Exception as e: | |
| status_messages.append(f"Error clearing resume store: {e}") | |
| # Clear culture store | |
| try: | |
| results = culture_store.get() | |
| if results and results['ids']: | |
| num_docs = len(results['ids']) | |
| culture_store._collection.delete( | |
| ids=results['ids'] | |
| ) | |
| status_messages.append(f"Cleared {num_docs} documents from culture database") | |
| else: | |
| status_messages.append("Culture database was already empty") | |
| except Exception as e: | |
| status_messages.append(f"Error clearing culture store: {e}") | |
| return "\n".join(status_messages) | |
| def _repo_path(filename: str) -> str: | |
| base = os.path.dirname(os.path.abspath(__file__)) | |
| return os.path.join(base, filename) | |
| def _read_docx_text(path: str) -> str: | |
| doc = DocxDocument(path) | |
| return "\n".join([p.text for p in doc.paragraphs if p.text.strip()]) | |
| def run_demo_one_click(): | |
| # Paths to repo-root sample files | |
| culture_path = _repo_path("Sample Culture Document RiskAwareTech.docx") | |
| resume_path = _repo_path("Sample Alex_Resume.docx") | |
| job_path = _repo_path("Sample Job Posting Senior Compliance Platform Engineer.docx") | |
| # 1) Clear DBs | |
| clear_msg = clear_databases() | |
| # 2) Store demo docs (works if your store_* functions accept file paths) | |
| culture_msg = store_culture_docs([culture_path]) | |
| resume_msg = store_resumes([resume_path]) | |
| # 3) Load job description text and analyze | |
| job_text = _read_docx_text(job_path) | |
| analysis = analyze_candidates(job_text) | |
| status = ( | |
| "β Demo completed.\n\n" | |
| f"{clear_msg}\n" | |
| f"{culture_msg}\n" | |
| f"{resume_msg}\n" | |
| f"Job description: {os.path.basename(job_path)}" | |
| ) | |
| return status, analysis | |
| def create_interface(): | |
| with gr.Blocks(theme='freddyaboulton/test-blue') as app: | |
| app.load(fn=record_visit, inputs=None, outputs=None) | |
| gr.Markdown("# AI Recruiter Assistant") | |
| gr.Markdown("""**Purpose** | |
| This prototype demonstrates how AI can support recruiting workflows β including candidate evaluation and outreach β while embedding safeguards for fairness, transparency, and verification. | |
| It is designed as a **decision-support tool**, not an automated decision-maker. | |
| β οΈ **Important Disclaimer** | |
| This tool does **not** replace recruiter judgment, legal review, or hiring policy compliance. Final hiring decisions must always be made by humans. | |
| π¬ **Feedback Welcome** | |
| Please share feedback, issues, or improvement ideas via the **Community** tab. | |
| """) | |
| with gr.Tabs(): | |
| # Recruiter View | |
| with gr.Tab("Candidate Assessment"): | |
| gr.Markdown("Clear existing culture documents and resumes from storage. Use this every time you are uploading new company documentation or do not want to select from the existing pool of resumes.") | |
| clear_btn = gr.Button("Clear All Databases") | |
| clear_status = gr.Textbox(label="Clear Status") | |
| gr.Markdown("π‘ Tip: A sample resume, culture document and job description are available in the **Files** section of this space for testing.") | |
| demo_one_click_btn = gr.Button("π One-click Demo (Clear β Store Sample Documents β Analyze)") | |
| demo_one_click_status = gr.Textbox(label="Demo Run Status", lines=6) | |
| gr.Markdown("Use this feature to upload company culture documents (values, principles, leadership philosophy). These documents will be used to assess the cultural fit of candidates.") | |
| with gr.Row(): | |
| culture_docs_upload = gr.File( | |
| label="Upload Company Culture Documents", | |
| file_count="multiple" | |
| ) | |
| store_culture_btn = gr.Button("Store Culture Docs") | |
| culture_status = gr.Textbox(label="Status") | |
| gr.Markdown("Use this feature to upload resumes in bulk (Word or PDF). Each resume is anonymized before analysis. These resumes will be used to assess the technical skills and culture fit of candidates.") | |
| with gr.Row(): | |
| resume_bulk_upload = gr.File( | |
| label="Upload Resumes", | |
| file_count="multiple" | |
| ) | |
| store_resumes_btn = gr.Button("Store Resumes") | |
| resume_status = gr.Textbox(label="Status") | |
| with gr.Row(): | |
| job_desc_recruiter = gr.Textbox( | |
| label="Paste the job description for the role you are hiring for.", | |
| lines=20 | |
| ) | |
| with gr.Row(): | |
| analyze_btn = gr.Button("Analyze Candidates") | |
| with gr.Row(): | |
| analysis_output = gr.Textbox( | |
| label="Analysis Results", | |
| lines=30 | |
| ) | |
| store_culture_btn.click( | |
| store_culture_docs, | |
| inputs=culture_docs_upload, | |
| outputs=culture_status | |
| ) | |
| store_resumes_btn.click( | |
| store_resumes, | |
| inputs=resume_bulk_upload, | |
| outputs=resume_status | |
| ) | |
| analyze_btn.click( | |
| analyze_candidates, | |
| inputs=job_desc_recruiter, | |
| outputs=analysis_output | |
| ) | |
| clear_btn.click( | |
| clear_databases, | |
| inputs=[], | |
| outputs=clear_status | |
| ) | |
| demo_one_click_btn.click( | |
| run_demo_one_click, | |
| inputs=[], | |
| outputs=[demo_one_click_status, analysis_output] | |
| ) | |
| with gr.Accordion("Admin (telemetry)", open=False): | |
| admin_key_input = gr.Textbox( | |
| label="Admin key", | |
| type="password", | |
| placeholder="Enter admin key to unlock", | |
| ) | |
| admin_panel = gr.Column(visible=False) | |
| with admin_panel: | |
| migrate_btn = gr.Button("One-time: migrate visits_legacy.jsonl β events/") | |
| migrate_status = gr.Textbox(label="Migration status") | |
| rollup_btn = gr.Button("Rebuild rollup: usage/visits.jsonl (from events)") | |
| rollup_status = gr.Textbox(label="Rollup status") | |
| migrate_btn.click(fn=migrate_legacy_jsonl_to_event_files, inputs=[], outputs=[migrate_status]) | |
| rollup_btn.click(fn=rebuild_visits_rollup_from_event_files, inputs=[], outputs=[rollup_status]) | |
| def _unlock_admin(user_key: str): | |
| ok = bool(ADMIN_KEY) and (user_key or "") == ADMIN_KEY | |
| # Show panel only if key matches | |
| return gr.update(visible=ok) | |
| admin_key_input.change(fn=_unlock_admin, inputs=[admin_key_input], outputs=[admin_panel]) | |
| # Candidate View | |
| with gr.Tab("Cold Email Generator"): | |
| with gr.Row(): | |
| resume_upload = gr.File(label="Upload Resume") | |
| job_desc_input = gr.Textbox( | |
| label="Paste Job Description", | |
| lines=10 | |
| ) | |
| generate_btn = gr.Button("Generate Cold Email") | |
| email_output = gr.Textbox( | |
| label="Generated Cold Email", | |
| lines=10 | |
| ) | |
| generate_btn.click( | |
| process_candidate_submission, | |
| inputs=[resume_upload, job_desc_input], | |
| outputs=email_output | |
| ) | |
| return app | |
| if __name__ == "__main__": | |
| app = create_interface() | |
| app.launch() |