原文地址:https://itxiaozhang.com/python-hmdb-mass-spectrometry-compound-extraction/
如果您需要远程电脑维修或者编程开发,请加我微信咨询。

需求分析

本工具旨在帮助用户根据质谱数据(质量数),自动从HMDB数据库批量检索代谢物信息,包括HMDB ID、常用名称、化学式及结构图片等,并导出为Excel可直接打开的CSV文件,极大提升代谢物注释效率。

程序介绍

  1. 准备数据:将所有待检索的质量数写入negative.txt文件,每行一个。
  2. 运行脚本
    • 双击或命令行运行hmdb_metabolite_extractor.py脚本。
    • 程序自动读取negative.txt,并按负离子模式批量检索HMDB。
    • 检索结果自动保存为代谢物数据.csv,日志信息保存在代谢物提取.log
  3. 主要功能模块
    • 质量数批量检索:自动根据质量数和离子模式检索HMDB ID
    • 代谢物详细信息抓取:根据HMDB ID获取常用名称、化学式、结构图片等信息
    • 并发加速与重试机制:采用多线程并发抓取,自动重试机制
    • 结果导出:自动整理为CSV文件,支持Excel直接打开

部分代码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
"""代谢物数据处理工具"""

import json, os, logging
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
from typing import List, Dict, Optional

class DataManager:
    def __init__(self, cache_file='cache.json', progress_file='progress.json'):
        self.cache_file = cache_file
        self.progress_file = progress_file
        self.cache = self._load_json(cache_file, {})
        self.progress = self._load_json(progress_file, {'processed': []})
    
    def _load_json(self, file: str, default: Dict) -> Dict:
        if os.path.exists(file):
            try:
                with open(file, 'r', encoding='utf-8') as f:
                    return json.load(f)
            except: pass
        return default
    
    def save_progress(self, item_id: str):
        if item_id not in self.progress['processed']:
            self.progress['processed'].append(item_id)
            self._save_json(self.progress_file, self.progress)
    
    def _save_json(self, file: str, data: Dict):
        try:
            with open(file, 'w', encoding='utf-8') as f:
                json.dump(data, f)
        except Exception as e:
            logging.error(f"保存文件失败: {e}")
    
    def is_processed(self, item_id: str) -> bool:
        return item_id in self.progress['processed']
    
    def get_cache(self, key: str) -> Optional[Dict]:
        return self.cache.get(key)
    
    def set_cache(self, key: str, data: Dict):
        self.cache[key] = data
        self._save_json(self.cache_file, self.cache)

def setup_logging():
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(levelname)s - %(message)s',
        handlers=[
            logging.FileHandler("处理日志.log", encoding='utf-8'),
            logging.StreamHandler()
        ]
    )

def process_data(items: List[str], resume: bool = False) -> List[Dict]:
    """处理数据的主要函数
    
    Args:
        items: 待处理的数据项列表
        resume: 是否继续上次的进度
        
    Returns:
        处理结果列表
    """
    data_manager = DataManager()
    results = []
    
    for item in items:
        # 检查是否已处理
        if resume and data_manager.is_processed(item):
            continue
            
        # 获取数据 - 
        data = {"id": item, "status": "processed"}
        results.append(data)
        
        # 保存进度
        data_manager.save_progress(item)
    
    return results

def main():
    setup_logging()
    try:
        # 读取输入数据 - 
        items = ["item1", "item2", "item3"]
        
        # 处理数据
        results = process_data(items, resume=True)
        
        # 保存结果 - 
        if results:
            logging.info(f"成功处理 {len(results)} 条数据")
                
    except Exception as e:
        logging.error(f"程序执行出错: {e}")

if __name__ == "__main__":
    main()

视频版本