Mirage Flow数据预处理实战:Python爬虫采集与智能清洗管道构建

张开发
2026/6/1 10:04:19 15 分钟阅读
Mirage Flow数据预处理实战:Python爬虫采集与智能清洗管道构建
Mirage Flow数据预处理实战Python爬虫采集与智能清洗管道构建每次准备训练一个新模型最头疼的是什么十有八九是数据。要么数据不够要么质量太差要么格式乱七八糟。自己手动整理费时费力不说还容易出错。今天咱们就来聊聊怎么用Python爬虫加上Mirage Flow的智能处理能力搭一套自动化数据采集清洗管道把数据准备的活儿变得又快又好。简单来说这套方案能帮你从合规的公开渠道自动抓取你需要的文本数据然后智能地清洗、去重、分类最后打包成模型能直接“吃”的格式。整个过程基本不用你操心省下来的时间你可以多喝几杯咖啡或者多调几轮模型参数。1. 为什么你需要一套自动化数据管道在动手之前我们先看看传统数据准备方式有哪些坑。如果你做过模型训练下面这些场景肯定不陌生为了找数据你得上各种网站、论坛、文档库手动复制粘贴或者写一堆临时脚本。数据抓回来之后更麻烦重复的内容一大堆格式千奇百怪还有不少广告、乱码、无关信息混在里面。光是清洗这一步就能耗掉你大半天时间。更头疼的是这个过程很难复用。这次为项目A写了一套脚本下次项目B需求变了脚本又得重写。数据质量也参差不齐全凭运气和手工检查。而一套好的自动化数据管道应该能做到这几点源头可控从合规、稳定的公开渠道获取数据。处理智能能自动识别和过滤低质量、重复、无关的内容。流程可复现每次运行都能得到一致、高质量的结果。输出即用清洗后的数据格式最好能直接丢给训练脚本。接下来我们就用Python爬虫和Mirage Flow一步步搭建这样一个管道。2. 搭建基础用Python爬虫合规采集数据首先明确一点我们只从允许爬取、且不侵犯个人隐私和版权的公开信息源获取数据比如技术博客的公开文章、开源项目的文档、新闻网站的公开报道等。采集前务必查看网站的robots.txt文件并控制请求频率避免对目标服务器造成压力。2.1 选择一个简单的目标源为了演示我们假设需要采集某个技术社区例如一个模拟的公开技术博客站的文章数据。我们的目标是获取文章的标题、正文内容、发布时间和标签。我们会使用requests库来获取网页用BeautifulSoup库来解析HTML。如果你还没安装可以用下面的命令安装pip install requests beautifulsoup42.2 编写一个简单的定向爬虫下面是一个基础的爬虫脚本它负责从指定的博客列表页抓取文章链接然后进入详情页提取我们需要的信息。import requests from bs4 import BeautifulSoup import time import json def fetch_article_links(list_url): 从列表页获取所有文章链接 headers { User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 } try: resp requests.get(list_url, headersheaders, timeout10) resp.raise_for_status() # 检查请求是否成功 soup BeautifulSoup(resp.text, html.parser) # 假设文章链接在 class 为 ‘article-item’ 的 a 标签里 article_links [] for item in soup.find_all(a, class_article-item): link item.get(href) if link and link.startswith(http): article_links.append(link) elif link: # 处理相对路径 article_links.append(fhttps://example-blog.com{link}) return article_links except requests.RequestException as e: print(f获取列表页失败: {e}) return [] def parse_article_detail(article_url): 解析单篇文章详情页提取内容 headers {User-Agent: Your-Crawler-Bot/1.0} try: resp requests.get(article_url, headersheaders, timeout10) resp.raise_for_status() soup BeautifulSoup(resp.text, html.parser) # 根据目标网站的实际HTML结构调整选择器 title soup.find(h1, class_post-title).get_text(stripTrue) if soup.find(h1, class_post-title) else No Title # 获取正文这里假设正文在 article 标签或某个特定div里 content_div soup.find(article) or soup.find(div, class_post-content) content content_div.get_text(stripTrue, separator\n) if content_div else No Content # 获取标签 tags [] tag_elements soup.find_all(a, class_post-tag) for tag in tag_elements: tags.append(tag.get_text(stripTrue)) # 获取时间 time_element soup.find(time) publish_time time_element.get(datetime) if time_element else Unknown return { url: article_url, title: title, content: content, tags: tags, publish_time: publish_time, source: example_tech_blog } except requests.RequestException as e: print(f解析文章 {article_url} 失败: {e}) return None def run_crawler(start_url, max_articles50): 主爬虫函数 print(f开始从 {start_url} 采集数据...) all_articles [] article_links fetch_article_links(start_url)[:max_articles] # 限制数量 for i, link in enumerate(article_links): print(f正在处理第 {i1}/{len(article_links)} 篇文章: {link}) article_data parse_article_detail(link) if article_data: all_articles.append(article_data) time.sleep(1) # 礼貌性延迟避免请求过快 # 将原始数据保存为JSON文件 with open(raw_articles.json, w, encodingutf-8) as f: json.dump(all_articles, f, ensure_asciiFalse, indent2) print(f采集完成共获取 {len(all_articles)} 篇文章已保存至 raw_articles.json) return all_articles if __name__ __main__: # 替换成你要采集的真实、合规的列表页URL BLOG_LIST_URL https://example-blog.com/articles raw_data run_crawler(BLOG_LIST_URL, max_articles20)这个脚本跑完你会得到一个raw_articles.json文件里面是爬下来的原始数据。但这只是第一步数据还很“脏”接下来就该Mirage Flow上场了。3. 智能清洗用Mirage Flow提升数据质量原始数据通常包含大量噪音重复内容、无关信息如导航栏文本、广告、格式混乱、质量参差不齐的段落。手动处理这些几乎是不可能的任务。Mirage Flow的核心能力在于语义理解我们可以利用它来智能地清洗数据。3.1 接入Mirage Flow进行语义去重简单的文本对比比如MD5只能发现完全相同的重复但Mirage Flow能理解语义。比如“如何搭建Python环境”和“Python环境配置指南”两篇文章内容高度相似但字面不同语义去重就能把它们找出来。我们需要安装Mirage Flow的Python SDK。pip install mirage-flow-client然后我们可以编写一个去重函数。基本思路是为每篇文章生成一个语义向量Embedding然后计算向量之间的相似度如果相似度超过某个阈值就认为是重复内容。from mirage_flow_client import MirageFlowClient import numpy as np from sklearn.metrics.pairwise import cosine_similarity import json # 初始化客户端假设你已有API密钥和端点 client MirageFlowClient( api_keyyour_api_key_here, base_urlhttps://api.example.com # 替换为你的端点 ) def get_text_embedding(text): 获取文本的语义向量 # 如果文本太长可以截取核心部分例如前1000字符 truncated_text text[:1000] response client.embeddings.create( modeltext-embedding-model, # 使用合适的嵌入模型 inputtruncated_text ) return response.data[0].embedding def semantic_deduplicate(articles, similarity_threshold0.85): 基于语义向量进行去重 print(开始语义去重...) if not articles: return [] # 为每篇文章计算嵌入向量这里用标题正文前一部分代表文章 texts_for_embedding [f{art[title]} {art[content][:500]} for art in articles] embeddings [] for text in texts_for_embedding: try: emb get_text_embedding(text) embeddings.append(emb) except Exception as e: print(f为文本生成嵌入失败: {e}) embeddings.append(None) # 构建相似度矩阵并去重 unique_indices [] for i in range(len(articles)): if embeddings[i] is None: continue # 跳过生成失败的文章 is_duplicate False for u_idx in unique_indices: if embeddings[u_idx] is None: continue # 计算余弦相似度 sim cosine_similarity([embeddings[i]], [embeddings[u_idx]])[0][0] if sim similarity_threshold: is_duplicate True print(f文章 {articles[i][title][:30]}... 与 {articles[u_idx][title][:30]}... 相似度过高 ({sim:.2f})视为重复。) break if not is_duplicate: unique_indices.append(i) unique_articles [articles[i] for i in unique_indices] print(f去重完成原始 {len(articles)} 篇去重后 {len(unique_articles)} 篇。) return unique_articles # 加载原始数据并去重 with open(raw_articles.json, r, encodingutf-8) as f: raw_articles json.load(f) deduplicated_articles semantic_deduplicate(raw_articles) # 保存去重后的数据 with open(deduplicated_articles.json, w, encodingutf-8) as f: json.dump(deduplicated_articles, f, ensure_asciiFalse, indent2)3.2 利用分类与关键词进行质量过滤不是所有抓取到的内容都是我们需要的。比如我们可能只想保留“人工智能”、“编程”相关的技术文章而过滤掉“公司新闻”、“活动通知”等。我们可以让Mirage Flow对文章内容进行分类或者提取关键词然后根据规则过滤。def categorize_and_filter(articles, desired_categories[技术教程, 算法解析, 开发工具]): 对文章进行分类并过滤掉非目标类别的文章 print(开始内容分类与过滤...) filtered_articles [] for art in articles: content_preview art[content][:800] # 取部分内容进行分类 try: # 调用Mirage Flow的分类或文本理解功能 # 这里假设有一个分类端点返回最可能的类别 response client.chat.completions.create( modelyour-classification-model, messages[ {role: system, content: 你是一个文本分类器请判断以下文章内容主要属于哪个类别。只返回类别名称。}, {role: user, content: f文章标题{art[title]}\n内容摘要{content_preview}\n可能的类别技术教程、算法解析、开发工具、行业新闻、活动通知、其他。请直接返回最匹配的一个类别。} ], max_tokens10 ) category response.choices[0].message.content.strip() # 检查是否是我们想要的类别 if any(desired_cat in category for desired_cat in desired_categories): art[predicted_category] category filtered_articles.append(art) print(f保留文章: {art[title][:40]}... - 类别: {category}) else: print(f过滤文章: {art[title][:40]}... - 类别: {category} (非目标类别)) except Exception as e: print(f分类文章 {art[title][:30]}... 时出错: {e}) # 出错时可以根据保守策略决定保留或过滤这里选择过滤 continue print(f过滤完成保留 {len(filtered_articles)} 篇目标类别文章。) return filtered_articles filtered_articles categorize_and_filter(deduplicated_articles) with open(filtered_articles.json, w, encodingutf-8) as f: json.dump(filtered_articles, f, ensure_asciiFalse, indent2)3.3 格式标准化与内容精炼最后我们需要把数据整理成模型训练常用的格式比如每条数据包含“instruction”指令、“input”输入、“output”输出或者简单的“text”字段。同时可以进一步清洗正文比如移除过多的空白符、标准化换行符等。def format_for_training(articles, format_typetext): 将文章数据格式化为训练所需的格式 formatted_data [] for art in articles: # 基础清洗合并多余空白标准化换行 clean_content .join(art[content].split()) clean_content clean_content.replace(\r\n, \n).replace(\r, \n) if format_type text: # 简单文本格式适合自回归语言模型预训练 formatted_text f标题{art[title]}\n标签{, .join(art[tags])}\n发布时间{art[publish_time]}\n\n正文{clean_content}\n\n formatted_data.append({text: formatted_text}) elif format_type instruction: # 指令微调格式可以构造一些简单的QA对 # 例如把标题作为指令正文作为输出 instruction f请根据以下标签和主题生成一篇技术文章{, .join(art[tags])} formatted_data.append({ instruction: instruction, input: , output: f{art[title]}\n\n{clean_content} }) # 可以添加更多格式... return formatted_data training_data format_for_training(filtered_articles, format_typetext) # 保存为最终训练集 with open(final_training_dataset.jsonl, w, encodingutf-8) as f: for item in training_data: f.write(json.dumps(item, ensure_asciiFalse) \n) print(f数据格式化完成共生成 {len(training_data)} 条训练样本已保存至 final_training_dataset.jsonl)4. 整合与优化构建完整数据管道我们把上面的步骤串起来形成一个完整的、可执行的脚本。同时加入一些工程化的考虑比如错误处理、日志记录、配置化参数。# pipeline.py import json import time import logging from pathlib import Path # 假设上述功能函数都已定义在相应的模块中 from crawler import run_crawler from mirage_cleaner import semantic_deduplicate, categorize_and_filter, format_for_training logging.basicConfig(levellogging.INFO, format%(asctime)s - %(levelname)s - %(message)s) logger logging.getLogger(__name__) class DataProcessingPipeline: def __init__(self, config): self.config config self.output_dir Path(config.get(output_dir, ./output)) self.output_dir.mkdir(parentsTrue, exist_okTrue) def run(self): 运行完整管道 logger.info(启动数据预处理管道...) # 阶段1: 采集 logger.info(阶段1: 数据采集) raw_data run_crawler( self.config[crawler][start_url], max_articlesself.config[crawler].get(max_articles, 100) ) self._save_stage_result(raw_data, 01_raw_data.json) # 阶段2: 语义去重 logger.info(阶段2: 语义去重) deduplicated_data semantic_deduplicate( raw_data, similarity_thresholdself.config[cleaner].get(dedup_threshold, 0.85) ) self._save_stage_result(deduplicated_data, 02_deduplicated.json) # 阶段3: 分类过滤 logger.info(阶段3: 分类过滤) filtered_data categorize_and_filter( deduplicated_data, desired_categoriesself.config[cleaner].get(target_categories, [技术教程, 算法解析]) ) self._save_stage_result(filtered_data, 03_filtered.json) # 阶段4: 格式化 logger.info(阶段4: 格式化输出) final_data format_for_training( filtered_data, format_typeself.config[formatter].get(format_type, text) ) self._save_final_dataset(final_data, final_dataset.jsonl) logger.info(f管道执行完毕最终生成 {len(final_data)} 条高质量数据。) return final_data def _save_stage_result(self, data, filename): 保存各阶段中间结果 filepath self.output_dir / filename with open(filepath, w, encodingutf-8) as f: json.dump(data, f, ensure_asciiFalse, indent2) logger.info(f中间结果已保存: {filepath}) def _save_final_dataset(self, data, filename): 保存最终数据集 filepath self.output_dir / filename if filename.endswith(.jsonl): with open(filepath, w, encodingutf-8) as f: for item in data: f.write(json.dumps(item, ensure_asciiFalse) \n) else: with open(filepath, w, encodingutf-8) as f: json.dump(data, f, ensure_asciiFalse, indent2) logger.info(f最终数据集已保存: {filepath}) if __name__ __main__: # 配置参数 config { crawler: { start_url: https://example-blog.com/articles, max_articles: 50 }, cleaner: { dedup_threshold: 0.82, target_categories: [技术教程, 编程, 人工智能] }, formatter: { format_type: text }, output_dir: ./data_output } pipeline DataProcessingPipeline(config) final_dataset pipeline.run()5. 总结走完这一套流程你会发现数据准备的工作量大大减轻了。从目标网站抓取数据到清洗去重、分类过滤再到格式标准化整个过程基本自动化了。你得到的不再是一堆杂乱无章的文本而是一个干净、规整、可以直接用于模型训练的高质量数据集。这套管道的核心价值在于“智能清洗”。传统的规则清洗比如关键词匹配、正则表达式很难处理语义层面的问题而Mirage Flow的语义理解能力正好补上了这块短板。它能理解两段文字在“说什么”而不仅仅是“写了什么字”这让去重和分类的准确率上了个大台阶。当然实际应用中还需要根据你的具体数据源和需求做调整比如处理登录、应对反爬、调整分类规则等。但整体的框架和思路是通用的。下次当你再为数据发愁时不妨试试把这套管道跑起来或许能帮你省下不少功夫。获取更多AI镜像想探索更多AI镜像和应用场景访问 CSDN星图镜像广场提供丰富的预置镜像覆盖大模型推理、图像生成、视频生成、模型微调等多个领域支持一键部署。

更多文章