1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
| import time, csv, os, re, json, threading
from collections import Counter
from concurrent.futures import ThreadPoolExecutor, as_completed
import requests
from lxml import html
from datetime import datetime
import psutil
HEADERS = {"User-Agent": "Mozilla/5.0 ..."}
MAX_WORKERS = 5
REQUESTS_PER_SECOND = 5
BATCH_WRITE_SIZE = 500
PROGRESS_REPORT_INTERVAL = 100
REQUEST_TIMEOUT = (5, 15)
MAX_RETRIES = 2
class RateLimiter:
def __init__(self, rate):
self.rate = rate
self.tokens = rate
self.last_update = time.time()
self.lock = threading.Lock()
def acquire(self):
with self.lock:
now = time.time()
self.tokens = min(self.rate, self.tokens + (now - self.last_update) * self.rate)
self.last_update = now
if self.tokens >= 1:
self.tokens -= 1
return True
return (1 - self.tokens) / self.rate
rate_limiter = RateLimiter(REQUESTS_PER_SECOND)
def check_ids(ids):
invalid, dup = [], Counter(ids)
duplicates = {i:c for i,c in dup.items() if c>1}
for idx, i in enumerate(ids, 1):
if not i.startswith("HMDB"):
invalid.append((idx, i))
return invalid, duplicates
def clean_text(s):
s = re.sub(r'[\x00-\x1F\u200B-\u200D\uFEFF]', '', s or '')
s = re.sub(r'\s+', ' ', s)
return s.strip()
def fetch(hmdb_id):
url = f"https://hmdb.ca/metabolites/{hmdb_id}"
wait = rate_limiter.acquire()
if wait is not True:
time.sleep(wait)
rate_limiter.acquire()
resp = requests.get(url, headers=HEADERS, timeout=REQUEST_TIMEOUT)
resp.raise_for_status()
return html.fromstring(resp.content)
def parse(tree, hmdb_id):
data = {"HMDB_ID": hmdb_id}
# 关键字段解析
cf = tree.xpath('//th[text()="Chemical Formula"]/following-sibling::td[1]')
data["Chemical_Formula"] = ''.join(cf[0].xpath('.//text()')) if cf else ""
# 其余字段:Average/Mono Weight、IUPAC、CAS、SMILES、分类层级、性质、通路、浓度、疾病参考、外部 ID...
# ...
return data
def should_retry(exc, status=None):
import requests as R
if isinstance(exc, R.exceptions.Timeout): return True
if isinstance(exc, R.exceptions.ConnectionError): return True
if isinstance(exc, R.exceptions.HTTPError):
if status in [429, 502, 503, 504]: return True
if status in [404, 400, 403]: return False
return True
return True
def get_with_retry(hmdb_id, retries=MAX_RETRIES):
last = None
for attempt in range(retries):
try:
tree = fetch(hmdb_id)
return parse(tree, hmdb_id)
except requests.exceptions.HTTPError as e:
last, status = e, (e.response.status_code if e.response else None)
if not should_retry(e, status): break
if attempt < retries - 1:
wait = 2 ** attempt
ra = e.response.headers.get('Retry-After') if e.response else None
if ra:
try: wait = max(wait, int(ra))
except: pass
time.sleep(wait)
except Exception as e:
last = e
if not should_retry(e): break
if attempt < retries - 1: time.sleep(2 ** attempt)
print(f"[{hmdb_id}] 重试失败: {last}")
return None
def mem_mb():
try: return psutil.Process().memory_info().rss / 1024 / 1024
except: return 0
def write_csv(rows, file, mode='a'):
cols = [
'HMDB_ID','Common_Name','Description','Synonyms','Chemical_Formula',
'Average_Molecular_Weight','Monoisotopic_Molecular_Weight','IUPAC_Name',
'Traditional_Name','CAS_Registry_Number','SMILES','Kingdom','Super_Class',
'Class','Sub_Class','Direct_Parent','Experimental_Molecular_Properties',
'Predicted_Molecular_Properties','Pathways','Normal_Concentrations',
'Abnormal_Concentrations','Disease_References','Associated_OMIM_IDs',
'KEGG_Compound_ID','PubChem_Compound'
]
exists = os.path.isfile(file)
with open(file, mode, newline='', encoding='utf-8-sig') as f:
w = csv.DictWriter(f, fieldnames=cols, quoting=csv.QUOTE_ALL)
if not exists or mode == 'w': w.writeheader()
w.writerows(rows)
def save_failed(failed, file='失败.txt'):
with open(file, 'w', encoding='utf-8') as f:
for i, err in failed: f.write(f"{i}\t{err}\n")
def save_progress(done, file='progress.json'):
data = {'processed_ids': list(done), 'timestamp': datetime.now().isoformat(), 'count': len(done)}
with open(file, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=2)
def load_progress(file='progress.json'):
if os.path.exists(file):
try:
with open(file, 'r', encoding='utf-8') as f: return set(json.load(f).get('processed_ids', []))
except: pass
return set()
def process_ids(hmdb_ids):
done = load_progress()
remain = [i for i in hmdb_ids if i not in done]
if not remain:
print("所有ID都已处理完成!")
return len(hmdb_ids), 0, []
results, failed = {}, []
total, curr, ok, bad = len(hmdb_ids), len(done), len(done), 0
start = time.time()
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as ex:
fut = {ex.submit(get_with_retry, i): i for i in remain}
for f in as_completed(fut):
i = fut[f]; curr += 1
try:
d = f.result()
if d:
results[i] = d; done.add(i); ok += 1
else:
failed.append((i, "数据获取失败")); bad += 1
except Exception as e:
failed.append((i, str(e))); bad += 1
if curr % PROGRESS_REPORT_INTERVAL == 0 or curr == total:
left = total - curr
pct = curr / total * 100
print(f"进度: {curr}/{total} ({pct:.1f}%) | 剩余: {left} | 成功: {ok} | 失败: {bad} | 内存: {mem_mb():.1f}MB")
if len(results) >= BATCH_WRITE_SIZE:
ordered = []
for _id in hmdb_ids:
if _id in results:
ordered.append(results[_id]); del results[_id]
if ordered: write_csv(ordered, '代谢物数据_最终.csv', 'a')
save_progress(done)
if results:
ordered = []
for _id in hmdb_ids:
if _id in results: ordered.append(results[_id])
if ordered: write_csv(ordered, '代谢物数据_最终.csv', 'a')
if failed: save_failed(failed)
save_progress(done)
return ok, bad, failed
def sort_by_original_order(original_ids, csv_file):
try:
import pandas as pd
df = pd.read_csv(csv_file, encoding='utf-8-sig')
if df.empty or 'HMDB_ID' not in df.columns: return
order = {i: idx for idx, i in enumerate(original_ids)}
df['sort_key'] = df['HMDB_ID'].map(order)
df.sort_values('sort_key').drop('sort_key', axis=1).to_csv(csv_file, index=False, encoding='utf-8-sig')
except ImportError:
print("提示:未安装 pandas,跳过最终排序。")
except Exception as e:
print(f"排序异常: {e}")
def main():
print("="*80); print("HMDB代谢物数据提取工具"); print("="*80)
if not os.path.exists('id.txt'):
print("错误:缺少 id.txt"); input("回车退出..."); return
with open('id.txt', 'r', encoding='utf-8') as f:
ids = f.read().splitlines()
invalid, duplicates = check_ids(ids)
valid = [i for i in ids if i.startswith("HMDB")]
unique = list(dict.fromkeys(valid))
if not os.path.exists('progress.json'):
write_csv([], '代谢物数据_最终.csv', 'w')
ok, bad, failed = process_ids(unique)
print("\n按原始ID顺序整理 CSV...")
sort_by_original_order(unique, '代谢物数据_最终.csv')
print("="*80); print(f"总计: {len(unique)} | 成功: {ok} | 失败: {bad}"); print("输出: 代谢物数据_最终.csv"); print("="*80)
if __name__ == "__main__":
main()
|