Gemini 系列的大模型多模态能力在最近两个版本( 2.0flash/2.5pro )有了明显的提升;于是本着干中学的态度,写了一个 PDF 转 Markdown 的在线工具
以下为在线地址,无需登录即可使用:
为控制成本,使用时需填入授权码,下方分享一批授权码
455ae82d7e4a427b86fd19c733da01d4,
83411d78e2a54cce99a5a4d7394c884d,
f90f8ace02fc43079a44afac727cf39d,
ee509e4d307c4c64b48833e00cbae9fa,
35cdc2be66cb41138bf33b6c94c9a55a,
ad68df9f8fa64c84b1cab4463671d935,
59974330e6d346bca8638221d33da4ce
同时贴上提取相关的 python 核心代码,欢迎大佬指正优化
from google import generativeai as genai
import os
from pathlib import Path
from typing import List, Dict
import base64
import mimetypes
import time
from tqdm import tqdm
import tempfile
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
import fitz # PyMuPDF
import uuid
import atexit
import json
class PDFToMarkdownConverter:
def __init__(self, api_key: str, api_endpoint: str = None, chunk_size: int = 20, max_retries: int = 3):
"""
初始化转换器
:param api_key: Gemini API 密钥
:param api_endpoint: 代理服务器地址
:param chunk_size: 每个分块的页数
:param max_retries: 最大重试次数
"""
# 设置日志
logging.basicConfig(level=logging.INFO)
self.logger = logging.getLogger(__name__)
# 基础配置
self.chunk_size = chunk_size
self.max_retries = max_retries
# 创建临时目录
self.temp_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'temp')
os.makedirs(self.temp_dir, exist_ok=True)
self.logger.info(f"临时文件目录: {self.temp_dir}")
# 设置 API 端点
if api_endpoint:
os.environ['GOOGLE_API_BASE_URL'] = api_endpoint
# 配置 API
genai.configure(
api_key=api_key,
transport="rest"
)
# 初始化模型
self.model = genai.GenerativeModel('gemini-2.0-flash')
# 注册退出时的清理函数
atexit.register(self._cleanup_temp_dir)
def _extract_images_from_pdf(self, pdf_path: str, task_id: str) -> Dict[int, List[Dict]]:
"""
从 PDF 中提取图片
:param pdf_path: PDF 文件路径
:param task_id: 任务 ID ,用于生成唯一的图片名称
:return: 按页码索引的图片信息字典 {页码: [图片信息列表]}
"""
images_by_page = {}
doc = fitz.open(pdf_path)
for page_num, page in enumerate(doc):
image_list = page.get_images(full=True)
page_images = []
for img_index, img_info in enumerate(image_list):
xref = img_info[0]
base_image = doc.extract_image(xref)
image_bytes = base_image["image"]
image_ext = base_image["ext"]
# 生成唯一的图片名称
image_filename = f"{task_id}_page{page_num + 1}_img{img_index + 1}.{image_ext}"
# 获取图片在页面中的位置
rect = page.get_image_bbox(img_info)
page_images.append({
"filename": image_filename,
"data": image_bytes,
"ext": image_ext,
"rect": rect,
"xref": xref
})
if page_images:
images_by_page[page_num] = page_images
doc.close()
return images_by_page
def _upload_images_to_s3(self, images_by_page: Dict[int, List[Dict]], s3_service) -> Dict[int, List[Dict]]:
"""
将提取的图片上传到 S3
:param images_by_page: 按页码索引的图片信息字典
:param s3_service: S3 服务实例
:return: 更新后的图片信息字典,包含 S3 URL
"""
for page_num, images in images_by_page.items():
for img in images:
try:
# 上传图片到 S3
file_key = f"images/{img['filename']}"
img_url = s3_service.upload_binary(
img['data'],
file_key,
f"image/{img['ext']}"
)
img['url'] = img_url
except Exception as e:
self.logger.error(f"图片上传失败: {str(e)}")
img['url'] = None
return images_by_page
def _cleanup_temp_dir(self):
"""
清理临时目录
"""
try:
if os.path.exists(self.temp_dir):
for file in os.listdir(self.temp_dir):
try:
file_path = os.path.join(self.temp_dir, file)
if os.path.isfile(file_path):
os.remove(file_path)
except Exception as e:
self.logger.warning(f"清理临时文件失败: {str(e)}")
os.rmdir(self.temp_dir)
except Exception as e:
self.logger.warning(f"清理临时目录失败: {str(e)}")
def _split_pdf(self, pdf_path: str) -> List[str]:
"""
将 PDF 分割成小块
:param pdf_path: PDF 文件路径
:return: 临时 PDF 文件路径列表
"""
temp_files = []
doc = fitz.open(pdf_path)
total_pages = doc.page_count
self.logger.info(f"PDF 总页数: {total_pages}")
for start in range(0, total_pages, self.chunk_size):
end = min(start + self.chunk_size, total_pages)
# 创建新的 PDF 文档
new_doc = fitz.open()
new_doc.insert_pdf(doc, from_page=start, to_page=end - 1)
# 使用 UUID 创建唯一的临时文件名
temp_file_path = os.path.join(self.temp_dir, f'chunk_{uuid.uuid4().hex}.pdf')
new_doc.save(temp_file_path)
new_doc.close()
temp_files.append(temp_file_path)
doc.close()
return temp_files
def _read_pdf_file(self, pdf_path: str) -> dict:
"""
读取 PDF 文件
:param pdf_path: PDF 文件路径
:return: 包含文件内容的字典
"""
try:
with open(pdf_path, 'rb') as file:
content = file.read()
base64_content = base64.b64encode(content).decode('utf-8')
return {
"mime_type": "application/pdf",
"data": base64_content
}
except Exception as e:
raise Exception(f"PDF 文件读取失败: {str(e)}")
def _process_chunk(self, pdf_path: str, keywords: List[str], chunk_index: int,
translation_mode: str = "single", target_language: str = None) -> Dict:
"""
处理单个 PDF 分块
:param pdf_path: PDF 分块文件路径
:param keywords: 关键词列表
:param chunk_index: 分块索引
:param translation_mode: 转换模式 ('single' 或 'bilingual')
:param target_language: 目标语言
:return: 包含处理结果的字典
"""
for attempt in range(self.max_retries):
try:
pdf_data = self._read_pdf_file(pdf_path)
# 获取当前分块的页数信息
doc = fitz.open(pdf_path)
page_info = f"(页码 {chunk_index * self.chunk_size + 1} - {chunk_index * self.chunk_size + doc.page_count})"
doc.close()
# 根据转换模式选择不同的提示词
if translation_mode == "single":
prompt = f"""
请将这份 PDF 文档{page_info}转换为 Markdown 格式。
转换要求:
{', '.join(keywords)}
请确保:
1. 输出格式为标准 Markdown ,但不要用```markdown 包裹全部内容
2. 保持文档的结构和层级
3. 保持内容的完整性
4. 表格要转换为 Markdown 表格格式
5. 代码块要使用正确的格式
6. 正确使用 Markdown 换行语法(使用两个空格或空行实现换行)
"""
else:
prompt = f"""
请将这份 PDF 文档{page_info}转换为双语对照的 Markdown 格式,不要用```markdown 包裹全部内容。
转换要求:
1. 将内容翻译成{target_language}
2. 使用对照模式展示原文和译文
3. {', '.join(keywords)}
输出格式要求:
1. 对于每个章节标题:
原文标题
译文标题
2. 对于正文内容:
原文段落
译文段落
3. 对于表格,输出翻译前后的两个表格的独立:
4. 对于代码块:
保持代码块原样,只翻译注释
请确保:
1. 保持文档的结构和层级
2. 翻译准确且符合目标语言的表达习惯
3. 保持格式的一致性
4. 专业术语的翻译准确
5. 正确使用 Markdown 换行语法(使用两个空格或空行实现换行)
6. 译文用引用的 Markdown 语法包裹以区分
"""
response = self.model.generate_content([prompt, pdf_data])
return {
'index': chunk_index,
'content': response.text,
'success': True
}
except Exception as e:
self.logger.error(f"处理分块{chunk_index}第{attempt + 1}次尝试失败: {str(e)}")
if attempt == self.max_retries - 1:
return {
'index': chunk_index,
'content': f"<!-- 处理失败: {str(e)} -->",
'success': False
}
def convert_to_markdown(self, pdf_path: str, keywords: List[str],
translation_mode: str = "single",
target_language: str = None,
parallel: bool = True,
progress_callback=None,
s3_service=None,
task_id: str = None) -> str:
"""
将 PDF 转换为 Markdown ,包括图片
:param pdf_path: PDF 文件路径
:param keywords: 指导转换的关键词列表
:param translation_mode: 转换模式 ('single' 或 'bilingual')
:param target_language: 目标语言
:param parallel: 是否并行处理
:param progress_callback: 进度回调函数
:param s3_service: S3 服务实例
:param task_id: 任务 ID
:return: Markdown 格式的文本
"""
start_time = time.time()
self.logger.info(f"开始转换... 模式: {translation_mode}" +
(f", 目标语言: {target_language}" if translation_mode == "bilingual" else ""))
temp_files = []
try:
# 提取图片(如果提供了 S3 服务和任务 ID )
images_by_page = {}
if s3_service and task_id:
self.logger.info("开始提取 PDF 中的图片...")
images_by_page = self._extract_images_from_pdf(pdf_path, task_id)
self.logger.info(f"共提取了 {sum(len(imgs) for imgs in images_by_page.values())} 张图片")
# 上传图片到 S3
self.logger.info("开始上传图片到 S3...")
images_by_page = self._upload_images_to_s3(images_by_page, s3_service)
# # 分割 PDF
# temp_files = self._split_pdf(pdf_path)
# chunks_count = len(temp_files)
# self.logger.info(f"PDF 已分割为{chunks_count}个块")
# 添加图片信息到关键词中
if images_by_page:
# 创建一个不包含二进制数据的图片信息字典
image_info_for_json = {}
for page_num, images in images_by_page.items():
image_info_for_json[page_num] = []
for img in images:
# 创建不包含二进制数据的图片信息副本
img_copy = {k: v for k, v in img.items() if k != 'data'}
# 将 Rect 对象转换为列表,以便 JSON 序列化
if 'rect' in img_copy and hasattr(img_copy['rect'], 'to_list'):
img_copy['rect'] = img_copy['rect'].to_list()
elif 'rect' in img_copy:
# 如果没有 to_list 方法,尝试直接转换为列表
img_copy['rect'] = [float(img_copy['rect'][0]), float(img_copy['rect'][1]),
float(img_copy['rect'][2]), float(img_copy['rect'][3])]
image_info_for_json[page_num].append(img_copy)
# 将处理后的图片信息转换为 JSON 字符串
try:
image_info_json = json.dumps(image_info_for_json)
keywords.append(f"PDF 中包含图片,请在适当位置插入图片链接。图片信息: {image_info_json}")
keywords.append("对于每个图片,使用 Markdown 图片语法  插入")
except TypeError as e:
self.logger.error(f"图片信息 JSON 序列化失败: {str(e)}")
# 如果序列化失败,添加简化的图片信息
simple_image_info = {}
for page_num, images in images_by_page.items():
simple_image_info[str(page_num)] = [
{"url": img.get('url', ''), "filename": img.get('filename', '')}
for img in images
]
image_info_json = json.dumps(simple_image_info)
keywords.append(f"PDF 中包含图片,请在适当位置插入图片链接。简化图片信息: {image_info_json}")
keywords.append("对于每个图片,使用 Markdown 图片语法  插入")
# 分割 PDF
temp_files = self._split_pdf(pdf_path)
chunks_count = len(temp_files)
self.logger.info(f"PDF 已分割为{chunks_count}个块")
results = []
processed_count = 0
if parallel:
with ThreadPoolExecutor() as executor:
futures = [
executor.submit(self._process_chunk, temp_file, keywords, idx,
translation_mode, target_language)
for idx, temp_file in enumerate(temp_files)
]
for future in as_completed(futures):
processed_count += 1
if progress_callback:
progress_callback(processed_count, chunks_count)
results.append(future.result())
else:
for idx, temp_file in enumerate(temp_files):
result = self._process_chunk(temp_file, keywords, idx,
translation_mode, target_language)
processed_count += 1
if progress_callback:
progress_callback(processed_count, chunks_count)
results.append(result)
# 按索引排序并合并结果
results.sort(key=lambda x: x['index'])
markdown_content = "\n\n".join(result['content'] for result in results)
# 计算并显示耗时
duration = time.time() - start_time
if duration < 60:
time_str = f"{duration:.2f}秒"
elif duration < 3600:
time_str = f"{duration / 60:.2f}分钟"
else:
time_str = f"{duration / 3600:.2f}小时"
# 输出转换统计信息
self.logger.info(f"转换完成!耗时: {time_str}")
# 统计成功率
success_count = sum(1 for r in results if r['success'])
self.logger.info(f"处理成功率: {success_count}/{chunks_count} " +
f"({success_count / chunks_count * 100:.2f}%)")
# 如果是双语模式,添加文档头部说明
if translation_mode == "bilingual":
# header = f"""# 双语对照文档
# 原文与{target_language}对照
#
# ---
#
# """
markdown_content = markdown_content
return markdown_content
except Exception as e:
error_msg = f"转换过程出错: {str(e)}"
self.logger.error(error_msg)
raise Exception(error_msg)
finally:
# 清理临时文件
for temp_file in temp_files:
try:
if os.path.exists(temp_file):
os.remove(temp_file)
except Exception as e:
self.logger.warning(f"清理临时文件失败: {str(e)}")
def save_markdown(self, markdown_text: str, output_path: str):
"""
保存 Markdown 文件
:param markdown_text: Markdown 文本
:param output_path: 输出文件路径
"""
try:
with open(output_path, 'w', encoding='utf-8') as f:
f.write(markdown_text)
self.logger.info(f"Markdown 文件已保存至: {output_path}")
except Exception as e:
raise Exception(f"文件保存错误: {str(e)}")
def main():
# 配置信息
api_key = "xxxxxxxxxxxxxxx"
api_endpoint = "https://xxxx.xxxxxxxxxxxxxx.com" # 你的代理服务器地址
# 文件路径
pdf_path = "origin.pdf" # 替换为你的 PDF 文件路径
output_path = ("output-doc.md")
# 转换指导关键词
# keywords = [
# "保持文档的标题层级结构",
# "将表格转换为 Markdown 表格格式",
# "如果文档中有公式,注意公式的识别和转换",
# "如果文档中有多栏格式时,注意多栏模式下的阅读顺序保持",
# "保持列表的缩进和格式",
# "突出显示重要内容",
# "保持代码块格式",
# "添加适当的分隔符",
# "注意只需要输出 Markdown 内容,不要做多余的解释",
# "如果发现文档内有图例,可以在需要插入图例的位置插入图片占位符"
# "注意提取的完整性,不要错漏文档的内容"
# ]
keywords = [
"You are an expert OCR assistant. Your job is to extract all text from the provided image and convert it into a well-structured, easy-to-read Markdown document that mirrors the intended structure of the original. Follow these precise guidelines:",
"Use Markdown headings, paragraphs, lists, and tables to match the document's hierarchy and flow.",
"For tables, use standard Markdown table syntax and merge cells if needed. If a table has a title, include it as plain text above the table.",
"Render mathematical formulas with LaTeX syntax: use $...$ for inline and $$...$$ for display equations.",
"For images, use the syntax  with a clear, descriptive alt text.",
"Remove unnecessary line breaks so that the text flows naturally without awkward breaks.",
"Your final Markdown output must be direct text (do not wrap it in code blocks).",
"Ensure your output is clear, accurate, and faithfully reflects the original image's content and structure."
]
# 转换模式配置
translation_mode = "single" # 可选 "single" 或 "bilingual"
target_language = "Chinese" # 目标语言,如 "英语"、"日语" 等
try:
# 根据转换模式设置输出文件名
base_name, ext = os.path.splitext(output_path)
if translation_mode == "bilingual":
output_path = f"{base_name}_{target_language}{ext}"
# 初始化转换器
converter = PDFToMarkdownConverter(
api_key=api_key,
api_endpoint=api_endpoint,
chunk_size=3,
max_retries=3
)
# 转换 PDF 到 Markdown
print(f"开始转换... 模式: {translation_mode}" +
(f", 目标语言: {target_language}" if translation_mode == "bilingual" else ""))
markdown_content = converter.convert_to_markdown(
pdf_path=pdf_path,
keywords=keywords,
translation_mode=translation_mode,
target_language=target_language,
parallel=True
)
# 保存结果
converter.save_markdown(markdown_content, output_path)
print(f"转换完成!文件已保存至: {output_path}")
except Exception as e:
print(f"转换过程中出现错误: {str(e)}")
if __name__ == "__main__":
main()
1
windamin 16 天前
用了 59974330e6d346bca8638221d33da4ce
之前一直被在这个问题困扰 想问下直接用模型识别,一个 50 页的大量图片的 pdf 转出来成本如何? |
3
l864494871 16 天前 via iPhone
👍
|
4
2han9wen71an 16 天前
有一个 124 页的 pdf 想试一下,结果授权码只能最大 100 页
|
![]() |
5
mcutown OP |
6
2han9wen71an 16 天前
换了一个简单的 pdf,为什么图片都变成了图片占位符?
|
![]() |
7
mcutown OP @2han9wen71an
因为目前的处理逻辑是对可能为文章插图的内容进行智能识别并以占位符替代。这样方便后续对附图使用 pymupdf 提取后并插入还原 |
![]() |
8
hugodotlau 16 天前
来个跟开源项目的对比吧,比如 makeitdown 啥的,不然没啥验证的热情了
|
![]() |
9
cz5424 16 天前
巧了,今天也看到楼上说的微软做的 makeitdown ; https://github.com/microsoft/markitdown ;这个还能支持 office 全家桶
|
![]() |
10
RoccoShi 16 天前
和微软开源的 markitdown 比优势在哪呢,人家都 50k star 了还一直在迭代。
|
![]() |
11
billzhuang 16 天前
Cloudflare Worker AI 也有这个服务,https://developers.cloudflare.com/workers-ai/markdown-conversion/
我之前试过,emm ,也就那样。 |
![]() |
12
mcutown OP 统一回复楼上各位。
目前市面上 PDF 内容结构化做的比较好的除了 markitdown 外,还有 Mineru 、docling 。以上这些项目因为工作关系,我都有比较深入的体验,对比我这里贴出的小工具来说,工程化的项目可能在某些场景下无疑更具有稳定性。 但我个人觉得,对于非深度或者非工程化人员,轻量脚本级的应用在使用上应该是更加灵活的 但是,这些工具都存在一个不可能三角,即效率、成本、质量,任何一个产品都无法同时具备;如果有需要,我可以单开一个帖子对以上 PDF 内容结构化项目进行综合评测 |
![]() |
13
lizhenda 16 天前
Gemini api 价格怎么样?
|
![]() |
14
mcutown OP ![]() |