1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
| """
================================
作者:IT小章
网站:itxiaozhang.com
时间:2024年10月11日
Copyright © 2024 IT小章
================================
"""
import csv
import time
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
from lxml import html
import os
from collections import Counter
import sys
HEADERS = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
NEW_COLUMNS = [
"exists in all living species, ranging from bacteria to humans",
"only found in individuals that have used or taken this drug",
"not a naturally occurring metabolite and is only found in those individuals exposed to this compound or its derivatives"
]
def check_ids(ids):
invalid_ids = []
id_counts = Counter(ids)
duplicates = {id: count for id, count in id_counts.items() if count > 1}
for index, id in enumerate(ids, 1):
if not id.startswith("HMDB"):
invalid_ids.append((index, id))
return invalid_ids, duplicates
def get_metabolite_data(hmdb_id):
# 省略
pass
def get_metabolite_data_with_retry(hmdb_id, max_retries=3):
for attempt in range(max_retries):
try:
return get_metabolite_data(hmdb_id)
except Exception:
if attempt == max_retries - 1:
print(f"[{hmdb_id}] 所有重试尝试均失败。")
time.sleep(2 ** attempt)
return None
def write_to_csv(results, filename, mode='a'):
fieldnames = ['HMDB ID'] + NEW_COLUMNS + [
'Super Class', 'Class', 'Sub Class',
'Disposition_source(Endogenous)', 'Endogenous(plant or animal or more)',
'Biological Properties_Biospecimen Locations',
'Biological Properties_Tissue Locations',
'KEGG Compound ID', 'ChEBI ID', 'METLIN ID'
]
file_exists = os.path.isfile(filename)
with open(filename, mode, newline='', encoding='utf-8') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
if not file_exists or mode == 'w':
writer.writeheader()
writer.writerows(results)
def process_ids(hmdb_ids, max_workers=5):
# 省略
pass
def main():
try:
if not os.path.exists('id.txt'):
print("错误:找不到 id.txt 文件。")
return
with open('id.txt', 'r') as f:
hmdb_ids = f.read().splitlines()
invalid_ids, duplicates = check_ids(hmdb_ids)
valid_ids = [id for id in hmdb_ids if id.startswith("HMDB")]
unique_ids = list(dict.fromkeys(valid_ids))
write_to_csv([], '代谢物数据_最终.csv', mode='w')
success_count, failure_count = process_ids(unique_ids)
print(f"数据提取完成。总计: {len(unique_ids)}, 成功: {success_count}, 失败: {failure_count}")
except Exception as e:
print(f"发生错误: {e}")
if __name__ == "__main__":
main()
|