1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176
| import csv import time import requests from concurrent.futures import ThreadPoolExecutor, as_completed from lxml import html import re import html as html_module import os from collections import Counter import sys
HEADERS = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' }
NEW_COLUMNS = [ "exists in all living species, ranging from bacteria to humans", "only found in individuals that have used or taken this drug", "not a naturally occurring metabolite and is only found in those individuals exposed to this compound or its derivatives" ]
def check_ids(ids): invalid_ids = [] id_counts = Counter(ids) duplicates = {id: count for id, count in id_counts.items() if count > 1} for index, id in enumerate(ids, 1): if not id.startswith("HMDB"): invalid_ids.append((index, id)) return invalid_ids, duplicates
def get_metabolite_data(hmdb_id): url = f"https://hmdb.ca/metabolites/{hmdb_id}" response = requests.get(url, headers=HEADERS) response.raise_for_status() tree = html.fromstring(response.content)
data = {'HMDB ID': hmdb_id}
description = tree.xpath("//td[@class='met-desc']/text()") description_text = description[0].strip() if description else ''
for column in NEW_COLUMNS: data[column] = 'yes' if column in description_text else 'no'
super_class = tree.xpath("//tr/th[text()='Super Class']/following-sibling::td/a/text()") data['Super Class'] = super_class[0].strip() if super_class else ''
return data
def get_metabolite_data_with_retry(hmdb_id, max_retries=3): for attempt in range(max_retries): try: return get_metabolite_data(hmdb_id) except Exception: if attempt == max_retries - 1: print(f"[{hmdb_id}] 所有重试尝试均失败。") time.sleep(2 ** attempt) return None
def write_to_csv(results, filename, mode='a'): fieldnames = ['HMDB ID'] + NEW_COLUMNS + [ 'Super Class', 'Class', 'Sub Class', 'Disposition_source(Endogenous)', 'Endogenous(plant or animal or more)', 'Biological Properties_Biospecimen Locations', 'Biological Properties_Tissue Locations', 'KEGG Compound ID', 'ChEBI ID', 'METLIN ID' ] file_exists = os.path.isfile(filename) with open(filename, mode, newline='', encoding='utf-8') as csvfile: writer = csv.DictWriter(csvfile, fieldnames=fieldnames) if not file_exists or mode == 'w': writer.writeheader() writer.writerows(results)
def process_ids(hmdb_ids, max_workers=5): results = {} total_ids = len(hmdb_ids) processed_count = 0 success_count = 0 failure_count = 0 with ThreadPoolExecutor(max_workers=max_workers) as executor: future_to_id = {executor.submit(get_metabolite_data_with_retry, hmdb_id): hmdb_id for hmdb_id in hmdb_ids} for future in as_completed(future_to_id): hmdb_id = future_to_id[future] processed_count += 1 try: data = future.result() if data: results[hmdb_id] = data status = "成功" success_count += 1 else: status = "失败" failure_count += 1 except Exception: status = "失败" failure_count += 1 print(f"正在处理第 {processed_count}/{total_ids} 个: {hmdb_id} - {status}") if processed_count % 10 == 0: write_to_csv(list(results.values()), '代谢物数据_最终.csv', mode='a') print(f"已保存到最终文件。已处理: {processed_count}, 成功: {success_count}, 失败: {failure_count}") results.clear() if results: write_to_csv(list(results.values()), '代谢物数据_最终.csv', mode='a') return success_count, failure_count
def main(): try: if not os.path.exists('id.txt'): print("错误:找不到 id.txt 文件。请确保该文件与程序在同一目录下。") input("按回车键退出...") sys.exit(1)
with open('id.txt', 'r') as f: hmdb_ids = f.read().splitlines() print(f"从id.txt加载了 {len(hmdb_ids)} 个ID") invalid_ids, duplicates = check_ids(hmdb_ids) if invalid_ids: print("发现以下异常ID:") for index, id in invalid_ids: print(f" 第{index}行: {id}") if duplicates: print("发现以下重复ID:") for id, count in duplicates.items(): print(f" {id}: 重复{count}次") valid_ids = [id for id in hmdb_ids if id.startswith("HMDB")] unique_ids = list(dict.fromkeys(valid_ids)) print(f"将处理 {len(unique_ids)} 个有效且唯一的ID") except Exception as e: print(f"发生错误: {e}") finally: write_to_csv([], '代谢物数据_最终.csv', mode='w')
success_count, failure_count = process_ids(unique_ids)
print(f"数据提取完成。总计: {len(unique_ids)}, 成功: {success_count}, 失败: {failure_count}") print(f"最终结果已保存在代谢物数据_最终.csv中")
print("按回车键退出程序...") try: input() except EOFError: pass
if __name__ == "__main__": main()
|