自动抓取 HMDB 外源性代谢物食物来源工具

原文地址:https://itxiaozhang.com/auto-hmdb-exogenous-metabolite-food-source 如果您需要远程电脑维修或者编程开发,请加我微信咨询。 1. 需求分析 代谢物的来源信息对于科研和营养分析非常重要。现有数据库(HMDB)中,外源性代谢物来源信息分散在网页上,手动整理效率低、容易出错。因此,需要一个工具能批量抓取代谢物的外源性来源信息,生成结构化、可分析的 CSV 数据。 2. 工具功能概述 抓取范围:只关注 HMDB 中 Disposition → Exogenous,即代谢物外源性来源。 输出数据:CSV 文件,每条记录包含 HMDB ID、食物名称、FooDB 编号;找不到或出错标 None。 自动化与鲁棒性: 多线程并发抓取,提高效率 随机 User-Agent、防封锁 自动重试和错误处理 每处理若干条自动保存,保证数据安全 用途:支持食物-代谢物关联分析、营养研究及数据库构建。 3. 程序结构 核心程序通过 Python 实现,主要功能包括: 加载 HMDB ID 列表 请求对应网页 提取代谢物外源性食物来源信息 生成 CSV 文件,每条记录包括 HMDB ID、食物名称及对应编号 错误处理和日志记录,确保每个 ID 都有处理结果 示意代码如下: 1 2 3 4 5 6 7 8 9 def get_metabolite_data(hmdb_id): """ 根据 HMDB ID 获取外源性食物来源信息 输出字典包含: - 'HMDB ID' - 'Source(Exogenous)' (食物名称和编号,找不到时为 'None') """ data = {'HMDB ID': hmdb_id, 'Source(Exogenous)': '...'} return data 视频版本 哔哩哔哩 YouTube

2025年12月27日 · 1 分钟 · IT小章

HMDB代谢物批量抓取与结构化导出

原文地址:https://itxiaozhang.com/hmdb-metabolite-batch-extraction-csv-export/ 如果您需要远程电脑维修或者编程开发,请加我微信咨询。 需求介绍 自动化获取多个 HMDB ID 的完整字段,减少手工搜索与遗漏。 稳定可靠:处理限速与网络波动,不中断、可续跑。 输出可用:统一字段、原始顺序、便于下游分析与复现。 程序如何运行 准备 在 id.txt 放入待处理 ID(如 HMDB0000123),一行一个。 安装依赖: 1 pip install requests lxml psutil pandas 执行 1 python HMDB_Metabolite_Extractor.py 结果 数据:代谢物数据_最终.csv(按原始 ID 顺序)。 失败:失败.txt(便于回补)。 进度:progress.json(断点续跑)。 整体框架 I/O 层 读取 id.txt;写入 代谢物数据_最终.csv;失败与进度持久化。 抓取层 requests + 全局 RateLimiter 控制 QPS;统一超时与重试策略。 解析层 lxml + 细粒度 XPath,覆盖 25 个关键字段(名称、化学式、分子量、分类层级、性质、通路、浓度、疾病参考、外部 ID 等)。 并发层 ThreadPoolExecutor;批量写入、降内存;进度与估时。 排序与收尾 按原始 ID 重新排序;清理临时文件;统计汇总。 核心代码框架 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 import time, csv, os, re, json, threading from collections import Counter from concurrent.futures import ThreadPoolExecutor, as_completed import requests from lxml import html from datetime import datetime import psutil HEADERS = {"User-Agent": "Mozilla/5.0 ..."} MAX_WORKERS = 5 REQUESTS_PER_SECOND = 5 BATCH_WRITE_SIZE = 500 PROGRESS_REPORT_INTERVAL = 100 REQUEST_TIMEOUT = (5, 15) MAX_RETRIES = 2 class RateLimiter: def __init__(self, rate): self.rate = rate self.tokens = rate self.last_update = time.time() self.lock = threading.Lock() def acquire(self): with self.lock: now = time.time() self.tokens = min(self.rate, self.tokens + (now - self.last_update) * self.rate) self.last_update = now if self.tokens >= 1: self.tokens -= 1 return True return (1 - self.tokens) / self.rate rate_limiter = RateLimiter(REQUESTS_PER_SECOND) def check_ids(ids): invalid, dup = [], Counter(ids) duplicates = {i:c for i,c in dup.items() if c>1} for idx, i in enumerate(ids, 1): if not i.startswith("HMDB"): invalid.append((idx, i)) return invalid, duplicates def clean_text(s): s = re.sub(r'[\x00-\x1F\u200B-\u200D\uFEFF]', '', s or '') s = re.sub(r'\s+', ' ', s) return s.strip() def fetch(hmdb_id): url = f"https://hmdb.ca/metabolites/{hmdb_id}" wait = rate_limiter.acquire() if wait is not True: time.sleep(wait) rate_limiter.acquire() resp = requests.get(url, headers=HEADERS, timeout=REQUEST_TIMEOUT) resp.raise_for_status() return html.fromstring(resp.content) def parse(tree, hmdb_id): data = {"HMDB_ID": hmdb_id} # 关键字段解析 cf = tree.xpath('//th[text()="Chemical Formula"]/following-sibling::td[1]') data["Chemical_Formula"] = ''.join(cf[0].xpath('.//text()')) if cf else "" # 其余字段:Average/Mono Weight、IUPAC、CAS、SMILES、分类层级、性质、通路、浓度、疾病参考、外部 ID... # ... return data def should_retry(exc, status=None): import requests as R if isinstance(exc, R.exceptions.Timeout): return True if isinstance(exc, R.exceptions.ConnectionError): return True if isinstance(exc, R.exceptions.HTTPError): if status in [429, 502, 503, 504]: return True if status in [404, 400, 403]: return False return True return True def get_with_retry(hmdb_id, retries=MAX_RETRIES): last = None for attempt in range(retries): try: tree = fetch(hmdb_id) return parse(tree, hmdb_id) except requests.exceptions.HTTPError as e: last, status = e, (e.response.status_code if e.response else None) if not should_retry(e, status): break if attempt < retries - 1: wait = 2 ** attempt ra = e.response.headers.get('Retry-After') if e.response else None if ra: try: wait = max(wait, int(ra)) except: pass time.sleep(wait) except Exception as e: last = e if not should_retry(e): break if attempt < retries - 1: time.sleep(2 ** attempt) print(f"[{hmdb_id}] 重试失败: {last}") return None def mem_mb(): try: return psutil.Process().memory_info().rss / 1024 / 1024 except: return 0 def write_csv(rows, file, mode='a'): cols = [ 'HMDB_ID','Common_Name','Description','Synonyms','Chemical_Formula', 'Average_Molecular_Weight','Monoisotopic_Molecular_Weight','IUPAC_Name', 'Traditional_Name','CAS_Registry_Number','SMILES','Kingdom','Super_Class', 'Class','Sub_Class','Direct_Parent','Experimental_Molecular_Properties', 'Predicted_Molecular_Properties','Pathways','Normal_Concentrations', 'Abnormal_Concentrations','Disease_References','Associated_OMIM_IDs', 'KEGG_Compound_ID','PubChem_Compound' ] exists = os.path.isfile(file) with open(file, mode, newline='', encoding='utf-8-sig') as f: w = csv.DictWriter(f, fieldnames=cols, quoting=csv.QUOTE_ALL) if not exists or mode == 'w': w.writeheader() w.writerows(rows) def save_failed(failed, file='失败.txt'): with open(file, 'w', encoding='utf-8') as f: for i, err in failed: f.write(f"{i}\t{err}\n") def save_progress(done, file='progress.json'): data = {'processed_ids': list(done), 'timestamp': datetime.now().isoformat(), 'count': len(done)} with open(file, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=2) def load_progress(file='progress.json'): if os.path.exists(file): try: with open(file, 'r', encoding='utf-8') as f: return set(json.load(f).get('processed_ids', [])) except: pass return set() def process_ids(hmdb_ids): done = load_progress() remain = [i for i in hmdb_ids if i not in done] if not remain: print("所有ID都已处理完成!") return len(hmdb_ids), 0, [] results, failed = {}, [] total, curr, ok, bad = len(hmdb_ids), len(done), len(done), 0 start = time.time() with ThreadPoolExecutor(max_workers=MAX_WORKERS) as ex: fut = {ex.submit(get_with_retry, i): i for i in remain} for f in as_completed(fut): i = fut[f]; curr += 1 try: d = f.result() if d: results[i] = d; done.add(i); ok += 1 else: failed.append((i, "数据获取失败")); bad += 1 except Exception as e: failed.append((i, str(e))); bad += 1 if curr % PROGRESS_REPORT_INTERVAL == 0 or curr == total: left = total - curr pct = curr / total * 100 print(f"进度: {curr}/{total} ({pct:.1f}%) | 剩余: {left} | 成功: {ok} | 失败: {bad} | 内存: {mem_mb():.1f}MB") if len(results) >= BATCH_WRITE_SIZE: ordered = [] for _id in hmdb_ids: if _id in results: ordered.append(results[_id]); del results[_id] if ordered: write_csv(ordered, '代谢物数据_最终.csv', 'a') save_progress(done) if results: ordered = [] for _id in hmdb_ids: if _id in results: ordered.append(results[_id]) if ordered: write_csv(ordered, '代谢物数据_最终.csv', 'a') if failed: save_failed(failed) save_progress(done) return ok, bad, failed def sort_by_original_order(original_ids, csv_file): try: import pandas as pd df = pd.read_csv(csv_file, encoding='utf-8-sig') if df.empty or 'HMDB_ID' not in df.columns: return order = {i: idx for idx, i in enumerate(original_ids)} df['sort_key'] = df['HMDB_ID'].map(order) df.sort_values('sort_key').drop('sort_key', axis=1).to_csv(csv_file, index=False, encoding='utf-8-sig') except ImportError: print("提示:未安装 pandas,跳过最终排序。") except Exception as e: print(f"排序异常: {e}") def main(): print("="*80); print("HMDB代谢物数据提取工具"); print("="*80) if not os.path.exists('id.txt'): print("错误:缺少 id.txt"); input("回车退出..."); return with open('id.txt', 'r', encoding='utf-8') as f: ids = f.read().splitlines() invalid, duplicates = check_ids(ids) valid = [i for i in ids if i.startswith("HMDB")] unique = list(dict.fromkeys(valid)) if not os.path.exists('progress.json'): write_csv([], '代谢物数据_最终.csv', 'w') ok, bad, failed = process_ids(unique) print("\n按原始ID顺序整理 CSV...") sort_by_original_order(unique, '代谢物数据_最终.csv') print("="*80); print(f"总计: {len(unique)} | 成功: {ok} | 失败: {bad}"); print("输出: 代谢物数据_最终.csv"); print("="*80) if __name__ == "__main__": main() 视频版本 哔哩哔哩 YouTube

2025年11月3日 · 5 分钟 · IT小章