原文地址:https://itxiaozhang.com/hmdb-metabolite-batch-extraction-csv-export/
如果您需要远程电脑维修或者编程开发,请加我微信咨询。
需求介绍 自动化获取多个 HMDB ID 的完整字段,减少手工搜索与遗漏。 稳定可靠:处理限速与网络波动,不中断、可续跑。 输出可用:统一字段、原始顺序、便于下游分析与复现。 程序如何运行 准备
在 id.txt 放入待处理 ID(如 HMDB0000123),一行一个。
安装依赖:
1 pip install requests lxml psutil pandas 执行
1 python HMDB_Metabolite_Extractor.py 结果
数据:代谢物数据_最终.csv(按原始 ID 顺序)。 失败:失败.txt(便于回补)。 进度:progress.json(断点续跑)。 整体框架 I/O 层 读取 id.txt;写入 代谢物数据_最终.csv;失败与进度持久化。 抓取层 requests + 全局 RateLimiter 控制 QPS;统一超时与重试策略。 解析层 lxml + 细粒度 XPath,覆盖 25 个关键字段(名称、化学式、分子量、分类层级、性质、通路、浓度、疾病参考、外部 ID 等)。 并发层 ThreadPoolExecutor;批量写入、降内存;进度与估时。 排序与收尾 按原始 ID 重新排序;清理临时文件;统计汇总。 核心代码框架 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 import time, csv, os, re, json, threading from collections import Counter from concurrent.futures import ThreadPoolExecutor, as_completed import requests from lxml import html from datetime import datetime import psutil HEADERS = {"User-Agent": "Mozilla/5.0 ..."} MAX_WORKERS = 5 REQUESTS_PER_SECOND = 5 BATCH_WRITE_SIZE = 500 PROGRESS_REPORT_INTERVAL = 100 REQUEST_TIMEOUT = (5, 15) MAX_RETRIES = 2 class RateLimiter: def __init__(self, rate): self.rate = rate self.tokens = rate self.last_update = time.time() self.lock = threading.Lock() def acquire(self): with self.lock: now = time.time() self.tokens = min(self.rate, self.tokens + (now - self.last_update) * self.rate) self.last_update = now if self.tokens >= 1: self.tokens -= 1 return True return (1 - self.tokens) / self.rate rate_limiter = RateLimiter(REQUESTS_PER_SECOND) def check_ids(ids): invalid, dup = [], Counter(ids) duplicates = {i:c for i,c in dup.items() if c>1} for idx, i in enumerate(ids, 1): if not i.startswith("HMDB"): invalid.append((idx, i)) return invalid, duplicates def clean_text(s): s = re.sub(r'[\x00-\x1F\u200B-\u200D\uFEFF]', '', s or '') s = re.sub(r'\s+', ' ', s) return s.strip() def fetch(hmdb_id): url = f"https://hmdb.ca/metabolites/{hmdb_id}" wait = rate_limiter.acquire() if wait is not True: time.sleep(wait) rate_limiter.acquire() resp = requests.get(url, headers=HEADERS, timeout=REQUEST_TIMEOUT) resp.raise_for_status() return html.fromstring(resp.content) def parse(tree, hmdb_id): data = {"HMDB_ID": hmdb_id} # 关键字段解析 cf = tree.xpath('//th[text()="Chemical Formula"]/following-sibling::td[1]') data["Chemical_Formula"] = ''.join(cf[0].xpath('.//text()')) if cf else "" # 其余字段:Average/Mono Weight、IUPAC、CAS、SMILES、分类层级、性质、通路、浓度、疾病参考、外部 ID... # ... return data def should_retry(exc, status=None): import requests as R if isinstance(exc, R.exceptions.Timeout): return True if isinstance(exc, R.exceptions.ConnectionError): return True if isinstance(exc, R.exceptions.HTTPError): if status in [429, 502, 503, 504]: return True if status in [404, 400, 403]: return False return True return True def get_with_retry(hmdb_id, retries=MAX_RETRIES): last = None for attempt in range(retries): try: tree = fetch(hmdb_id) return parse(tree, hmdb_id) except requests.exceptions.HTTPError as e: last, status = e, (e.response.status_code if e.response else None) if not should_retry(e, status): break if attempt < retries - 1: wait = 2 ** attempt ra = e.response.headers.get('Retry-After') if e.response else None if ra: try: wait = max(wait, int(ra)) except: pass time.sleep(wait) except Exception as e: last = e if not should_retry(e): break if attempt < retries - 1: time.sleep(2 ** attempt) print(f"[{hmdb_id}] 重试失败: {last}") return None def mem_mb(): try: return psutil.Process().memory_info().rss / 1024 / 1024 except: return 0 def write_csv(rows, file, mode='a'): cols = [ 'HMDB_ID','Common_Name','Description','Synonyms','Chemical_Formula', 'Average_Molecular_Weight','Monoisotopic_Molecular_Weight','IUPAC_Name', 'Traditional_Name','CAS_Registry_Number','SMILES','Kingdom','Super_Class', 'Class','Sub_Class','Direct_Parent','Experimental_Molecular_Properties', 'Predicted_Molecular_Properties','Pathways','Normal_Concentrations', 'Abnormal_Concentrations','Disease_References','Associated_OMIM_IDs', 'KEGG_Compound_ID','PubChem_Compound' ] exists = os.path.isfile(file) with open(file, mode, newline='', encoding='utf-8-sig') as f: w = csv.DictWriter(f, fieldnames=cols, quoting=csv.QUOTE_ALL) if not exists or mode == 'w': w.writeheader() w.writerows(rows) def save_failed(failed, file='失败.txt'): with open(file, 'w', encoding='utf-8') as f: for i, err in failed: f.write(f"{i}\t{err}\n") def save_progress(done, file='progress.json'): data = {'processed_ids': list(done), 'timestamp': datetime.now().isoformat(), 'count': len(done)} with open(file, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=2) def load_progress(file='progress.json'): if os.path.exists(file): try: with open(file, 'r', encoding='utf-8') as f: return set(json.load(f).get('processed_ids', [])) except: pass return set() def process_ids(hmdb_ids): done = load_progress() remain = [i for i in hmdb_ids if i not in done] if not remain: print("所有ID都已处理完成!") return len(hmdb_ids), 0, [] results, failed = {}, [] total, curr, ok, bad = len(hmdb_ids), len(done), len(done), 0 start = time.time() with ThreadPoolExecutor(max_workers=MAX_WORKERS) as ex: fut = {ex.submit(get_with_retry, i): i for i in remain} for f in as_completed(fut): i = fut[f]; curr += 1 try: d = f.result() if d: results[i] = d; done.add(i); ok += 1 else: failed.append((i, "数据获取失败")); bad += 1 except Exception as e: failed.append((i, str(e))); bad += 1 if curr % PROGRESS_REPORT_INTERVAL == 0 or curr == total: left = total - curr pct = curr / total * 100 print(f"进度: {curr}/{total} ({pct:.1f}%) | 剩余: {left} | 成功: {ok} | 失败: {bad} | 内存: {mem_mb():.1f}MB") if len(results) >= BATCH_WRITE_SIZE: ordered = [] for _id in hmdb_ids: if _id in results: ordered.append(results[_id]); del results[_id] if ordered: write_csv(ordered, '代谢物数据_最终.csv', 'a') save_progress(done) if results: ordered = [] for _id in hmdb_ids: if _id in results: ordered.append(results[_id]) if ordered: write_csv(ordered, '代谢物数据_最终.csv', 'a') if failed: save_failed(failed) save_progress(done) return ok, bad, failed def sort_by_original_order(original_ids, csv_file): try: import pandas as pd df = pd.read_csv(csv_file, encoding='utf-8-sig') if df.empty or 'HMDB_ID' not in df.columns: return order = {i: idx for idx, i in enumerate(original_ids)} df['sort_key'] = df['HMDB_ID'].map(order) df.sort_values('sort_key').drop('sort_key', axis=1).to_csv(csv_file, index=False, encoding='utf-8-sig') except ImportError: print("提示:未安装 pandas,跳过最终排序。") except Exception as e: print(f"排序异常: {e}") def main(): print("="*80); print("HMDB代谢物数据提取工具"); print("="*80) if not os.path.exists('id.txt'): print("错误:缺少 id.txt"); input("回车退出..."); return with open('id.txt', 'r', encoding='utf-8') as f: ids = f.read().splitlines() invalid, duplicates = check_ids(ids) valid = [i for i in ids if i.startswith("HMDB")] unique = list(dict.fromkeys(valid)) if not os.path.exists('progress.json'): write_csv([], '代谢物数据_最终.csv', 'w') ok, bad, failed = process_ids(unique) print("\n按原始ID顺序整理 CSV...") sort_by_original_order(unique, '代谢物数据_最终.csv') print("="*80); print(f"总计: {len(unique)} | 成功: {ok} | 失败: {bad}"); print("输出: 代谢物数据_最终.csv"); print("="*80) if __name__ == "__main__": main() 视频版本 哔哩哔哩 YouTube