1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212
   | import time, csv, os, re, json, threading from collections import Counter from concurrent.futures import ThreadPoolExecutor, as_completed import requests from lxml import html from datetime import datetime import psutil
  HEADERS = {"User-Agent": "Mozilla/5.0 ..."} MAX_WORKERS = 5 REQUESTS_PER_SECOND = 5 BATCH_WRITE_SIZE = 500 PROGRESS_REPORT_INTERVAL = 100 REQUEST_TIMEOUT = (5, 15) MAX_RETRIES = 2
  class RateLimiter:     def __init__(self, rate):         self.rate = rate         self.tokens = rate         self.last_update = time.time()         self.lock = threading.Lock()     def acquire(self):         with self.lock:             now = time.time()             self.tokens = min(self.rate, self.tokens + (now - self.last_update) * self.rate)             self.last_update = now             if self.tokens >= 1:                 self.tokens -= 1                 return True             return (1 - self.tokens) / self.rate
  rate_limiter = RateLimiter(REQUESTS_PER_SECOND)
  def check_ids(ids):     invalid, dup = [], Counter(ids)     duplicates = {i:c for i,c in dup.items() if c>1}     for idx, i in enumerate(ids, 1):         if not i.startswith("HMDB"):             invalid.append((idx, i))     return invalid, duplicates
  def clean_text(s):     s = re.sub(r'[\x00-\x1F\u200B-\u200D\uFEFF]', '', s or '')     s = re.sub(r'\s+', ' ', s)     return s.strip()
  def fetch(hmdb_id):     url = f"https://hmdb.ca/metabolites/{hmdb_id}"     wait = rate_limiter.acquire()     if wait is not True:         time.sleep(wait)         rate_limiter.acquire()     resp = requests.get(url, headers=HEADERS, timeout=REQUEST_TIMEOUT)     resp.raise_for_status()     return html.fromstring(resp.content)
  def parse(tree, hmdb_id):     data = {"HMDB_ID": hmdb_id}          cf = tree.xpath('//th[text()="Chemical Formula"]/following-sibling::td[1]')     data["Chemical_Formula"] = ''.join(cf[0].xpath('.//text()')) if cf else ""               return data
  def should_retry(exc, status=None):     import requests as R     if isinstance(exc, R.exceptions.Timeout): return True     if isinstance(exc, R.exceptions.ConnectionError): return True     if isinstance(exc, R.exceptions.HTTPError):         if status in [429, 502, 503, 504]: return True         if status in [404, 400, 403]: return False         return True     return True
  def get_with_retry(hmdb_id, retries=MAX_RETRIES):     last = None     for attempt in range(retries):         try:             tree = fetch(hmdb_id)             return parse(tree, hmdb_id)         except requests.exceptions.HTTPError as e:             last, status = e, (e.response.status_code if e.response else None)             if not should_retry(e, status): break             if attempt < retries - 1:                 wait = 2 ** attempt                 ra = e.response.headers.get('Retry-After') if e.response else None                 if ra:                     try: wait = max(wait, int(ra))                     except: pass                 time.sleep(wait)         except Exception as e:             last = e             if not should_retry(e): break             if attempt < retries - 1: time.sleep(2 ** attempt)     print(f"[{hmdb_id}] 重试失败: {last}")     return None
  def mem_mb():     try: return psutil.Process().memory_info().rss / 1024 / 1024     except: return 0
  def write_csv(rows, file, mode='a'):     cols = [         'HMDB_ID','Common_Name','Description','Synonyms','Chemical_Formula',         'Average_Molecular_Weight','Monoisotopic_Molecular_Weight','IUPAC_Name',         'Traditional_Name','CAS_Registry_Number','SMILES','Kingdom','Super_Class',         'Class','Sub_Class','Direct_Parent','Experimental_Molecular_Properties',         'Predicted_Molecular_Properties','Pathways','Normal_Concentrations',         'Abnormal_Concentrations','Disease_References','Associated_OMIM_IDs',         'KEGG_Compound_ID','PubChem_Compound'     ]     exists = os.path.isfile(file)     with open(file, mode, newline='', encoding='utf-8-sig') as f:         w = csv.DictWriter(f, fieldnames=cols, quoting=csv.QUOTE_ALL)         if not exists or mode == 'w': w.writeheader()         w.writerows(rows)
  def save_failed(failed, file='失败.txt'):     with open(file, 'w', encoding='utf-8') as f:         for i, err in failed: f.write(f"{i}\t{err}\n")
  def save_progress(done, file='progress.json'):     data = {'processed_ids': list(done), 'timestamp': datetime.now().isoformat(), 'count': len(done)}     with open(file, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=2)
  def load_progress(file='progress.json'):     if os.path.exists(file):         try:             with open(file, 'r', encoding='utf-8') as f: return set(json.load(f).get('processed_ids', []))         except: pass     return set()
  def process_ids(hmdb_ids):     done = load_progress()     remain = [i for i in hmdb_ids if i not in done]     if not remain:         print("所有ID都已处理完成!")         return len(hmdb_ids), 0, []
      results, failed = {}, []     total, curr, ok, bad = len(hmdb_ids), len(done), len(done), 0     start = time.time()
      with ThreadPoolExecutor(max_workers=MAX_WORKERS) as ex:         fut = {ex.submit(get_with_retry, i): i for i in remain}         for f in as_completed(fut):             i = fut[f]; curr += 1             try:                 d = f.result()                 if d:                     results[i] = d; done.add(i); ok += 1                 else:                     failed.append((i, "数据获取失败")); bad += 1             except Exception as e:                 failed.append((i, str(e))); bad += 1
              if curr % PROGRESS_REPORT_INTERVAL == 0 or curr == total:                 left = total - curr                 pct = curr / total * 100                 print(f"进度: {curr}/{total} ({pct:.1f}%) | 剩余: {left} | 成功: {ok} | 失败: {bad} | 内存: {mem_mb():.1f}MB")
              if len(results) >= BATCH_WRITE_SIZE:                 ordered = []                 for _id in hmdb_ids:                     if _id in results:                         ordered.append(results[_id]); del results[_id]                 if ordered: write_csv(ordered, '代谢物数据_最终.csv', 'a')                 save_progress(done)
      if results:         ordered = []         for _id in hmdb_ids:             if _id in results: ordered.append(results[_id])         if ordered: write_csv(ordered, '代谢物数据_最终.csv', 'a')
      if failed: save_failed(failed)     save_progress(done)     return ok, bad, failed
  def sort_by_original_order(original_ids, csv_file):     try:         import pandas as pd         df = pd.read_csv(csv_file, encoding='utf-8-sig')         if df.empty or 'HMDB_ID' not in df.columns: return         order = {i: idx for idx, i in enumerate(original_ids)}         df['sort_key'] = df['HMDB_ID'].map(order)         df.sort_values('sort_key').drop('sort_key', axis=1).to_csv(csv_file, index=False, encoding='utf-8-sig')     except ImportError:         print("提示:未安装 pandas,跳过最终排序。")     except Exception as e:         print(f"排序异常: {e}")
  def main():     print("="*80); print("HMDB代谢物数据提取工具"); print("="*80)     if not os.path.exists('id.txt'):         print("错误:缺少 id.txt"); input("回车退出..."); return     with open('id.txt', 'r', encoding='utf-8') as f:         ids = f.read().splitlines()     invalid, duplicates = check_ids(ids)     valid = [i for i in ids if i.startswith("HMDB")]     unique = list(dict.fromkeys(valid))     if not os.path.exists('progress.json'):         write_csv([], '代谢物数据_最终.csv', 'w')     ok, bad, failed = process_ids(unique)     print("\n按原始ID顺序整理 CSV...")     sort_by_original_order(unique, '代谢物数据_最终.csv')     print("="*80); print(f"总计: {len(unique)} | 成功: {ok} | 失败: {bad}"); print("输出: 代谢物数据_最终.csv"); print("="*80)
  if __name__ == "__main__":     main()
 
  |