1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
| import requests
import json
import time
from typing import List, Dict, Any
from abc import ABC, abstractmethod
class BaseExtractor(ABC):
"""抽象基类 - 数据提取器接口"""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.session = self._create_session()
def _create_session(self) -> requests.Session:
"""创建HTTP会话"""
session = requests.Session()
session.headers.update(self.config.get('headers', {}))
return session
@abstractmethod
def search(self, query: str) -> List[str]:
"""搜索方法 - 子类必须实现"""
pass
@abstractmethod
def extract(self, item_id: str) -> Dict[str, Any]:
"""提取方法 - 子类必须实现"""
pass
class CacheManager:
"""缓存管理器"""
def __init__(self, cache_file: str):
self.cache_file = cache_file
self.data = self._load()
def _load(self) -> Dict:
try:
with open(self.cache_file, 'r') as f:
return json.load(f)
except FileNotFoundError:
return {}
def get(self, key: str) -> Any:
return self.data.get(key)
def set(self, key: str, value: Any):
self.data[key] = value
self._save()
def _save(self):
with open(self.cache_file, 'w') as f:
json.dump(self.data, f)
class DataProcessor:
"""数据处理器"""
@staticmethod
def process_batch(items: List[str], processor_func) -> List[Dict]:
"""批量处理数据"""
results = []
for item in items:
try:
result = processor_func(item)
if result:
results.append(result)
except Exception as e:
print(f"处理失败: {item}, 错误: {e}")
return results
@staticmethod
def save_results(data: List[Dict], filename: str):
"""保存结果"""
with open(filename, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
class WebExtractor(BaseExtractor):
"""具体实现类"""
def __init__(self, config: Dict[str, Any]):
super().__init__(config)
self.cache = CacheManager(config.get('cache_file', 'cache.json'))
def search(self, query: str) -> List[str]:
"""搜索实现"""
# 检查缓存
cache_key = f"search_{query}"
cached = self.cache.get(cache_key)
if cached:
return cached
# 模拟网络请求
time.sleep(1) # 请求延迟
# 这里是具体的搜索逻辑
# 实际项目中会有具体的API调用和数据解析
results = self._perform_search(query)
# 缓存结果
self.cache.set(cache_key, results)
return results
def extract(self, item_id: str) -> Dict[str, Any]:
"""提取实现"""
# 检查缓存
cached = self.cache.get(item_id)
if cached:
return cached
# 模拟网络请求
time.sleep(0.5)
# 这里是具体的提取逻辑
data = self._perform_extraction(item_id)
# 缓存结果
if data:
self.cache.set(item_id, data)
return data
def _perform_search(self, query: str) -> List[str]:
"""执行搜索 - 具体实现会根据目标网站调整"""
# 这里会有具体的HTTP请求和HTML解析逻辑
return [f"item_{i}" for i in range(3)] # 示例返回
def _perform_extraction(self, item_id: str) -> Dict[str, Any]:
"""执行提取 - 具体实现会根据目标网站调整"""
# 这里会有具体的数据提取逻辑
return {
'id': item_id,
'title': f'Title for {item_id}',
'data': f'Data for {item_id}'
}
class Application:
"""应用程序主类"""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.extractor = WebExtractor(config)
self.processor = DataProcessor()
def run(self, queries: List[str]):
"""运行应用程序"""
all_results = []
for query in queries:
# 搜索相关项目
items = self.extractor.search(query)
# 提取详细数据
for item_id in items:
data = self.extractor.extract(item_id)
if data:
data['query'] = query
all_results.append(data)
# 保存结果
self.processor.save_results(all_results, 'results.json')
print(f"完成处理,共获取 {len(all_results)} 条数据")
def main():
"""主函数"""
# 配置参数
config = {
'headers': {'User-Agent': 'Generic-Bot/1.0'},
'cache_file': 'app_cache.json',
'delay': 1.0
}
# 创建应用实例
app = Application(config)
# 示例查询列表
test_queries = ['query1', 'query2', 'query3']
# 运行程序
app.run(test_queries)
if __name__ == '__main__':
main()
|