1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
| """代谢物数据处理工具"""
import json, os, logging
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
from typing import List, Dict, Optional
class DataManager:
def __init__(self, cache_file='cache.json', progress_file='progress.json'):
self.cache_file = cache_file
self.progress_file = progress_file
self.cache = self._load_json(cache_file, {})
self.progress = self._load_json(progress_file, {'processed': []})
def _load_json(self, file: str, default: Dict) -> Dict:
if os.path.exists(file):
try:
with open(file, 'r', encoding='utf-8') as f:
return json.load(f)
except: pass
return default
def save_progress(self, item_id: str):
if item_id not in self.progress['processed']:
self.progress['processed'].append(item_id)
self._save_json(self.progress_file, self.progress)
def _save_json(self, file: str, data: Dict):
try:
with open(file, 'w', encoding='utf-8') as f:
json.dump(data, f)
except Exception as e:
logging.error(f"保存文件失败: {e}")
def is_processed(self, item_id: str) -> bool:
return item_id in self.progress['processed']
def get_cache(self, key: str) -> Optional[Dict]:
return self.cache.get(key)
def set_cache(self, key: str, data: Dict):
self.cache[key] = data
self._save_json(self.cache_file, self.cache)
def setup_logging():
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("处理日志.log", encoding='utf-8'),
logging.StreamHandler()
]
)
def process_data(items: List[str], resume: bool = False) -> List[Dict]:
"""处理数据的主要函数
Args:
items: 待处理的数据项列表
resume: 是否继续上次的进度
Returns:
处理结果列表
"""
data_manager = DataManager()
results = []
for item in items:
# 检查是否已处理
if resume and data_manager.is_processed(item):
continue
# 获取数据 -
data = {"id": item, "status": "processed"}
results.append(data)
# 保存进度
data_manager.save_progress(item)
return results
def main():
setup_logging()
try:
# 读取输入数据 -
items = ["item1", "item2", "item3"]
# 处理数据
results = process_data(items, resume=True)
# 保存结果 -
if results:
logging.info(f"成功处理 {len(results)} 条数据")
except Exception as e:
logging.error(f"程序执行出错: {e}")
if __name__ == "__main__":
main()
|