#!/usr/bin/env python3 """ Script to perform functional analysis of genes with DMLs using the annotation file. """ import pandas as pd import re from collections import Counter, defaultdict import matplotlib.pyplot as plt import seaborn as sns def parse_annotation_file(annotation_file): """Parse the annotation file and extract functional information.""" print("Parsing annotation file...") # Read the annotation file df = pd.read_csv(annotation_file, sep='\t', low_memory=False) print(f"Loaded {len(df)} gene annotations") # Clean up column names df.columns = ['GeneID', 'NR_des', 'NT_des', 'eggNOG', 'Pfam_des', 'Swissprot_des', 'COG', 'ko', 'KO', 'KEGG_des', 'GO'] # Create a dictionary for quick lookup - handle both with and without .1 suffix annotation_dict = {} for _, row in df.iterrows(): gene_id = row['GeneID'] # Store with original ID annotation_dict[gene_id] = { 'NR_des': str(row['NR_des']) if pd.notna(row['NR_des']) else '', 'NT_des': str(row['NT_des']) if pd.notna(row['NT_des']) else '', 'eggNOG': str(row['eggNOG']) if pd.notna(row['eggNOG']) else '', 'Pfam_des': str(row['Pfam_des']) if pd.notna(row['Pfam_des']) else '', 'Swissprot_des': str(row['Swissprot_des']) if pd.notna(row['Swissprot_des']) else '', 'COG': str(row['COG']) if pd.notna(row['COG']) else '', 'ko': str(row['ko']) if pd.notna(row['ko']) else '', 'KO': str(row['KO']) if pd.notna(row['KO']) else '', 'KEGG_des': str(row['KEGG_des']) if pd.notna(row['KEGG_des']) else '', 'GO': str(row['GO']) if pd.notna(row['GO']) else '' } # Also store without .1 suffix for matching if gene_id.endswith('.1'): base_id = gene_id[:-2] # Remove .1 annotation_dict[base_id] = annotation_dict[gene_id] print(f"Created annotation lookup for {len(annotation_dict)} gene IDs") return annotation_dict def extract_go_terms(go_string): """Extract GO terms from GO string.""" if not go_string or go_string == 'nan' or go_string == '': return [] # Extract GO terms using regex go_terms = re.findall(r'GO:\d+', go_string) return go_terms def extract_kegg_pathways(kegg_string): """Extract KEGG pathway information.""" if not kegg_string or kegg_string == 'nan' or kegg_string == '': return [] # Extract KEGG pathway IDs kegg_pathways = re.findall(r'crg:\d+', kegg_string) return kegg_pathways def extract_pfam_domains(pfam_string): """Extract Pfam domain information.""" if not pfam_string or pfam_string == 'nan' or pfam_string == '': return [] # Extract Pfam domain IDs pfam_domains = re.findall(r'PF\d+\.\d+', pfam_string) return pfam_domains def perform_functional_analysis(genes_with_dmls_file, annotation_dict, output_prefix): """Perform functional analysis of genes with DMLs.""" print("Reading genes with DMLs...") dml_genes = pd.read_csv(genes_with_dmls_file, sep='\t') print(f"Found {len(dml_genes)} DML-gene associations") # Get unique gene IDs unique_genes = dml_genes['gene_id'].unique() print(f"Unique genes: {len(unique_genes)}") # Initialize results functional_data = [] # Process each gene print("Analyzing functional annotations...") for gene_id in unique_genes: if gene_id in annotation_dict: annotation = annotation_dict[gene_id] # Extract functional information go_terms = extract_go_terms(annotation['GO']) kegg_pathways = extract_kegg_pathways(annotation['KEGG_des']) pfam_domains = extract_pfam_domains(annotation['Pfam_des']) # Get methylation data for this gene gene_dmls = dml_genes[dml_genes['gene_id'] == gene_id] avg_meth_diff = gene_dmls['dml_meth_diff'].mean() num_dmls = len(gene_dmls) functional_data.append({ 'gene_id': gene_id, 'gene_name': gene_dmls.iloc[0]['gene_name'], 'chr': gene_dmls.iloc[0]['chr'], 'num_dmls': num_dmls, 'avg_meth_diff': avg_meth_diff, 'NR_des': annotation['NR_des'], 'NT_des': annotation['NT_des'], 'eggNOG': annotation['eggNOG'], 'Pfam_des': annotation['Pfam_des'], 'Swissprot_des': annotation['Swissprot_des'], 'COG': annotation['COG'], 'ko': annotation['ko'], 'KO': annotation['KO'], 'KEGG_des': annotation['KEGG_des'], 'GO': annotation['GO'], 'go_terms': go_terms, 'kegg_pathways': kegg_pathways, 'pfam_domains': pfam_domains }) else: print(f"Warning: No annotation found for gene {gene_id}") # Convert to DataFrame functional_df = pd.DataFrame(functional_data) if len(functional_df) == 0: print("No functional annotations found. Exiting.") return # Save detailed results functional_df.to_csv(f'{output_prefix}_functional_annotations.tsv', sep='\t', index=False) # Perform GO term analysis perform_go_analysis(functional_df, output_prefix) # Perform KEGG pathway analysis perform_kegg_analysis(functional_df, output_prefix) # Perform Pfam domain analysis perform_pfam_analysis(functional_df, output_prefix) # Generate summary statistics generate_summary_statistics(functional_df, output_prefix) print(f"Functional analysis complete. Results saved with prefix: {output_prefix}") def perform_go_analysis(functional_df, output_prefix): """Analyze GO terms in genes with DMLs.""" print("Analyzing GO terms...") # Collect all GO terms all_go_terms = [] for go_list in functional_df['go_terms']: all_go_terms.extend(go_list) # Count GO terms go_counts = Counter(all_go_terms) # Get top GO terms top_go_terms = go_counts.most_common(50) # Save GO analysis go_df = pd.DataFrame(top_go_terms, columns=['GO_Term', 'Count']) go_df.to_csv(f'{output_prefix}_go_terms.tsv', sep='\t', index=False) # Create GO term visualization plt.figure(figsize=(12, 8)) top_20 = go_counts.most_common(20) terms, counts = zip(*top_20) plt.barh(range(len(terms)), counts) plt.yticks(range(len(terms)), terms) plt.xlabel('Number of Genes') plt.title('Top 20 GO Terms in Genes with DMLs') plt.gca().invert_yaxis() plt.tight_layout() plt.savefig(f'{output_prefix}_go_terms.png', dpi=300, bbox_inches='tight') plt.close() print(f"GO analysis complete. Found {len(go_counts)} unique GO terms") def perform_kegg_analysis(functional_df, output_prefix): """Analyze KEGG pathways in genes with DMLs.""" print("Analyzing KEGG pathways...") # Collect all KEGG pathways all_kegg_pathways = [] for kegg_list in functional_df['kegg_pathways']: all_kegg_pathways.extend(kegg_list) # Count KEGG pathways kegg_counts = Counter(all_kegg_pathways) if kegg_counts: # Save KEGG analysis kegg_df = pd.DataFrame(kegg_counts.most_common(), columns=['KEGG_Pathway', 'Count']) kegg_df.to_csv(f'{output_prefix}_kegg_pathways.tsv', sep='\t', index=False) # Create KEGG pathway visualization plt.figure(figsize=(10, 6)) top_15 = kegg_counts.most_common(15) pathways, counts = zip(*top_15) plt.bar(range(len(pathways)), counts) plt.xticks(range(len(pathways)), pathways, rotation=45, ha='right') plt.ylabel('Number of Genes') plt.title('Top 15 KEGG Pathways in Genes with DMLs') plt.tight_layout() plt.savefig(f'{output_prefix}_kegg_pathways.png', dpi=300, bbox_inches='tight') plt.close() print(f"KEGG analysis complete. Found {len(kegg_counts)} unique pathways") else: print("No KEGG pathways found in the data") def perform_pfam_analysis(functional_df, output_prefix): """Analyze Pfam domains in genes with DMLs.""" print("Analyzing Pfam domains...") # Collect all Pfam domains all_pfam_domains = [] for pfam_list in functional_df['pfam_domains']: all_pfam_domains.extend(pfam_list) # Count Pfam domains pfam_counts = Counter(all_pfam_domains) if pfam_counts: # Save Pfam analysis pfam_df = pd.DataFrame(pfam_counts.most_common(), columns=['Pfam_Domain', 'Count']) pfam_df.to_csv(f'{output_prefix}_pfam_domains.tsv', sep='\t', index=False) # Create Pfam domain visualization plt.figure(figsize=(12, 8)) top_20 = pfam_counts.most_common(20) domains, counts = zip(*top_20) plt.bar(range(len(domains)), counts) plt.xticks(range(len(domains)), domains, rotation=45, ha='right') plt.ylabel('Number of Genes') plt.title('Top 20 Pfam Domains in Genes with DMLs') plt.tight_layout() plt.savefig(f'{output_prefix}_pfam_domains.png', dpi=300, bbox_inches='tight') plt.close() print(f"Pfam analysis complete. Found {len(pfam_counts)} unique domains") else: print("No Pfam domains found in the data") def generate_summary_statistics(functional_df, output_prefix): """Generate summary statistics for the functional analysis.""" print("Generating summary statistics...") # Basic statistics total_genes = len(functional_df) genes_with_go = len(functional_df[functional_df['go_terms'].apply(len) > 0]) genes_with_kegg = len(functional_df[functional_df['kegg_pathways'].apply(len) > 0]) genes_with_pfam = len(functional_df[functional_df['pfam_domains'].apply(len) > 0]) # Methylation statistics meth_diff_stats = functional_df['avg_meth_diff'].describe() # Create summary report summary = f"""# Functional Analysis Summary - Genes with DMLs ## Overview - **Total genes analyzed**: {total_genes} - **Genes with GO terms**: {genes_with_go} ({genes_with_go/total_genes*100:.1f}%) - **Genes with KEGG pathways**: {genes_with_kegg} ({genes_with_kegg/total_genes*100:.1f}%) - **Genes with Pfam domains**: {genes_with_pfam} ({genes_with_pfam/total_genes*100:.1f}%) ## Methylation Statistics - **Mean methylation difference**: {meth_diff_stats['mean']:.2f}% - **Median methylation difference**: {meth_diff_stats['50%']:.2f}% - **Min methylation difference**: {meth_diff_stats['min']:.2f}% - **Max methylation difference**: {meth_diff_stats['max']:.2f}% - **Standard deviation**: {meth_diff_stats['std']:.2f}% ## Files Generated 1. **{output_prefix}_functional_annotations.tsv** - Complete functional annotations 2. **{output_prefix}_go_terms.tsv** - GO term analysis 3. **{output_prefix}_kegg_pathways.tsv** - KEGG pathway analysis 4. **{output_prefix}_pfam_domains.tsv** - Pfam domain analysis 5. **{output_prefix}_go_terms.png** - GO term visualization 6. **{output_prefix}_kegg_pathways.png** - KEGG pathway visualization 7. **{output_prefix}_pfam_domains.png** - Pfam domain visualization ## Key Findings - Genes with DMLs show diverse functional annotations - GO terms provide insights into biological processes affected by methylation - KEGG pathways reveal metabolic and signaling pathways involved - Pfam domains indicate protein families and structural motifs affected """ # Save summary with open(f'{output_prefix}_summary.md', 'w') as f: f.write(summary) print("Summary statistics generated") def main(): import argparse parser = argparse.ArgumentParser(description='Perform functional analysis of genes with DMLs') parser.add_argument('--genes_with_dmls', required=True, help='Genes with DMLs file (TSV)') parser.add_argument('--annotation', required=True, help='Annotation file (TSV)') parser.add_argument('--output', required=True, help='Output prefix for results') args = parser.parse_args() # Parse annotation file annotation_dict = parse_annotation_file(args.annotation) # Perform functional analysis perform_functional_analysis(args.genes_with_dmls, annotation_dict, args.output) if __name__ == "__main__": main()