1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
| """ ================================ 作者:IT小章 网站:itxiaozhang.com 时间:2024年10月11日 Copyright © 2024 IT小章 ================================ """
import csv import time import requests from concurrent.futures import ThreadPoolExecutor, as_completed from lxml import html import os from collections import Counter import sys
HEADERS = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' }
NEW_COLUMNS = [ "exists in all living species, ranging from bacteria to humans", "only found in individuals that have used or taken this drug", "not a naturally occurring metabolite and is only found in those individuals exposed to this compound or its derivatives" ]
def check_ids(ids): invalid_ids = [] id_counts = Counter(ids) duplicates = {id: count for id, count in id_counts.items() if count > 1} for index, id in enumerate(ids, 1): if not id.startswith("HMDB"): invalid_ids.append((index, id)) return invalid_ids, duplicates
def get_metabolite_data(hmdb_id): pass
def get_metabolite_data_with_retry(hmdb_id, max_retries=3): for attempt in range(max_retries): try: return get_metabolite_data(hmdb_id) except Exception: if attempt == max_retries - 1: print(f"[{hmdb_id}] 所有重试尝试均失败。") time.sleep(2 ** attempt) return None
def write_to_csv(results, filename, mode='a'): fieldnames = ['HMDB ID'] + NEW_COLUMNS + [ 'Super Class', 'Class', 'Sub Class', 'Disposition_source(Endogenous)', 'Endogenous(plant or animal or more)', 'Biological Properties_Biospecimen Locations', 'Biological Properties_Tissue Locations', 'KEGG Compound ID', 'ChEBI ID', 'METLIN ID' ] file_exists = os.path.isfile(filename) with open(filename, mode, newline='', encoding='utf-8') as csvfile: writer = csv.DictWriter(csvfile, fieldnames=fieldnames) if not file_exists or mode == 'w': writer.writeheader() writer.writerows(results)
def process_ids(hmdb_ids, max_workers=5): pass
def main(): try: if not os.path.exists('id.txt'): print("错误:找不到 id.txt 文件。") return
with open('id.txt', 'r') as f: hmdb_ids = f.read().splitlines() invalid_ids, duplicates = check_ids(hmdb_ids) valid_ids = [id for id in hmdb_ids if id.startswith("HMDB")] unique_ids = list(dict.fromkeys(valid_ids)) write_to_csv([], '代谢物数据_最终.csv', mode='w') success_count, failure_count = process_ids(unique_ids) print(f"数据提取完成。总计: {len(unique_ids)}, 成功: {success_count}, 失败: {failure_count}")
except Exception as e: print(f"发生错误: {e}")
if __name__ == "__main__": main()
|