function cleanWithProgress(markdown, fileName, sourceType, progressCallback):
# ============ Step 0:原始文档展示 ============
progressCallback.accept(stepDone(0, "格式转换", markdown.length))
# ============ Step 1:规则预处理(主大头,8 子步骤,无 LLM)============
# 1.0 HTML 表格转 Markdown 表格(确保后面被识别为 TABLE 段)
prepared = HtmlTableConverter.convertHtmlTables(markdown)
# 1.1 ⭐ 分段:TEXT / TABLE / CODE_BLOCK
segments = SegmentParser.parseSegments(prepared)
# → 后续每段差异化处理,代码块原样保留
# 1.2 各段处理(按段类型分流)
for seg in segments:
if seg.type == TEXT:
seg = TextPreprocessor.ruleBasedPreprocess(seg, sourceType) # [rule-preprocess]
seg = LatexCleaner.wrapResidualMathAsCodeBlock(seg) # 残留 LaTeX → 代码块兜底
elif seg.type == TABLE:
seg = LatexCleaner 系列(seg) # MinerU 表格 \mathrm{m} 残留
elif seg.type == CODE_BLOCK:
pass # 原样不动
# 1.3 ⭐ 断行合并(PPT 跳过 — 短行列表合并会破坏结构)
if not sourceType.isPpt():
for seg in TEXT 段:
seg = LineProcessor.mergeTruncatedHeadings(seg) # 合并截断标题
seg = LineProcessor.removeFalseEmptyLines(seg)
seg = LineProcessor.mergeLineBreaks(seg) # 规则:确定/不确定
seg = LineProcessor.splitClauseParagraphs(seg) # 条款拆分
收集 uncertainBreaks # [line-merge]
# LLM 批量判断不确定的断行
if uncertainBreaks 非空 and llmConfigured:
llmResults = LlmMergeJudge.judge(uncertainBreaks)
应用 → LineProcessor.applyLlmMergeResults
# 失败降级:LineProcessor.ruleBasedLineBreakMerge(宽松规则)
# 1.4 行内嵌入标题拆分(PPT/MARKDOWN 跳过)
if not isPpt and not isMarkdown:
for seg in TEXT 段:
seg = HeadingProcessor.splitEmbeddedTitles(seg) # [heading-processor]
# 1.5 隐式标题提取("一、xxx。正文..."→ 拆成独立标题行)
for seg in TEXT 段:
seg = HeadingProcessor.splitImplicitHeadings(seg) # 必须在 1.6 之前
# 1.6 ⭐ 目录删除 + 标题层级检测(Java 端核心)
for seg in TEXT 段:
seg = CleanerUtils.removeTocBlock(seg) # 删目录段
seg = NumberingSchemeDetector.detectAndFormat(seg) # [heading-regex]
# 1.7 代码块自动检测(检测未包裹的代码片段加 ```)
for seg in TEXT 段:
seg = CodeBlockDetector.detect(seg) # [code-block-detect]
# 1.8 段落级去重(仅 VLM 来源)
if sourceType.isVlm():
preprocessed = CleanerUtils.removeDuplicateParagraphs(preprocessed)
preprocessed = SegmentParser.reassembleSegments(segments) # 重组
# ============ Step 1.9:标题正文未分离判定 ============
needsHeadingGen = (titleCount == 0)
if not needsHeadingGen and HeadingProcessor.isHeadingBodyMixed(preprocessed):
preprocessed = HeadingProcessor.stripHeadingMarkers(preprocessed)
needsHeadingGen = true
# ============ Step 2A:无标题文档 → 语义分块生成标题 ============
if needsHeadingGen and preprocessed.length > 500 and llmConfigured:
withHeadings = SemanticHeadingGenerator.generate(preprocessed, baseLevel=2) # [subtitle-gen]
if withHeadings == null: # embedding 挂了
withHeadings = SubtitleGenerator.generateHeadingStructure(...) # 降级 LLM
if withHeadings != null:
preprocessed = withHeadings
# ============ Step 2B:超长段落补子标题 ============
longSections = SubtitleGenerator.findLongSections(preprocessed, threshold=1500)
for ls in longSections:
enhanced = SemanticHeadingGenerator.generate(ls.content, ls.level+1) # 优先
if enhanced == null:
enhanced = SubtitleGenerator.generateSubTitles(ls.content, ls.level+1) # 降级
if enhanced != null:
replacements[si] = enhanced
if replacements 非空:
按行号重建文档(allLines, longSections, replacements) # 用 startLine/endLine 精确替换
return preprocessed
# ⭐ 一图看清:每个 chip 在哪一步
# Step 1.1 → md-segments Step 1.2 → rule-preprocess
# Step 1.3 → line-merge Step 1.4/1.5 → heading-processor
# Step 1.6 → heading-regex Step 1.7 → code-block-detect
# Step 2A/2B → subtitle-gen
# ============ Step 1.4:splitEmbeddedTitles 行内多标题拆分 ============
function splitEmbeddedTitles(text):
# 场景:PDF 抽取后一行里有多个标题挤在一起
# 例如: "(一) 指导思想 (二) 总体目标 (三) 主要任务"
# → 后续标题检测只能识别第一个
for line in lines:
if line.length < 20: continue # 短行不处理
提取 # 前缀(如有)
rawContent = 去掉 # 后的内容
# ⭐ 扫描行内,找拆分点
splitPoints = []
for pos in 1..rawContent.length:
prev = rawContent[pos-1]
# ⭐ 关键:仅在前一字符是 ) ) 。 ; 时才考虑拆
# 避免误拆正文中间的"3.5倍"、"100.00元"
if prev not in [')', ')', '。', ';']: continue
remaining = stripBoldMarkers(rawContent[pos:])
# ⭐ 用 LEVEL1-3 patterns(不用 LEVEL4 — \d+[.、] 行内太容易误匹配)
for pattern in [TITLE_LEVEL1, TITLE_LEVEL2, TITLE_LEVEL3]:
if pattern.lookingAt(remaining):
splitPoints.add(pos)
break
# 按拆分点切割,每个标题独立成行
for sp in splitPoints:
seg = rawContent[prev..sp]
输出 (if 第一段: hashPrefix + seg else: seg)
输出 "" # 空行分隔
# ============ Step 1.5:splitImplicitHeadings 隐式标题提取 ============
function splitImplicitHeadings(text):
# 场景:"编号 + 标题 + 正文" 合并到一行
# 例:"(一) 指导思想。坚持以习近平新时代中国特色社会主义思想为指导..."
# → 拆成:
# "(一) 指导思想"
# ""
# "坚持以习近平新时代..."
# ⭐ 必须在 NumberingSchemeDetector 之前!否则整行都被标为标题
for line in lines:
raw = line.replaceFirst("^#+\\s*", "")
if raw.length <= 80: continue # 短行不处理
pt = NumberingSchemeDetector.detectType(raw)
if pt == null: continue # 没编号类型,不动
# ⭐ 跳过法律条款类型(第X章/节/条) — 这些由 NumberingSchemeDetector 内部处理
if pt in [ARTICLE, CHAPTER, SECTION]: continue
# 找第一个句末标点 。 或 ; 作为标题结束点
splitPos = 第一个 "。" 或 ";" 的位置(扫前 100 字符)
if 0 < splitPos <= 50:
# 句号在 50 字内:标题不会超过 50 字
输出 raw[0..splitPos] # 标题
输出 "" # 空行
输出 raw[splitPos+1:] # 正文
elif 50 < splitPos <= 100:
# 50-100 字也拆(超长标题)
同上拆分
else:
# 没句号 / > 100 字 → 不拆,保留原样
# ============ Step 1.9:isHeadingBodyMixed 判定整篇是否需要重新生成标题 ============
function isHeadingBodyMixed(text, titleCount):
# 场景:MinerU 把所有内容都标成 #(标题密集 + 没正文)
# → 没法切分,得剥掉 # 让 SemanticHeadingGenerator 重新生成
if titleCount < 3: return false
for heading line:
# 看本标题到下一个标题之间是否有非空行
hasBody = 后续到下个 # 之间存在非空行
if not hasBody: headingsWithNoBody++
# ⭐ 阈值:≥ 80% 标题后没正文 → 判定为"未分离"
return headingsWithNoBody >= totalHeadings × 0.8
# ============ stripHeadingMarkers:剥除全部 # 标记 ============
function stripHeadingMarkers(text):
return text.replaceAll("(?m)^#+\\s+", "")
# 还原成纯文本,准备交给 SemanticHeadingGenerator(Step 2A)
# ============ 三个方法在流水线中的关系 ============
# Step 1.4 splitEmbeddedTitles 解决"一行多标题"
# Step 1.5 splitImplicitHeadings 解决"标题+正文合并"
# Step 1.9 isHeadingBodyMixed 判定"整篇需要重新生成标题"
# + stripHeadingMarkers 剥除 # 让 Step 2A 重新生成
#
# 这三个保证 NumberingSchemeDetector(Step 1.6)拿到的是规范的"一行一标题"格式
function detect(text):
for i, line in lines:
trimmed = line.trim()
# ============ 跳过场景(不检测)============
if trimmed.startsWith("```"): # 已有代码块,不动
inCodeBlock = !inCodeBlock
continue
if inCodeBlock: continue # 在已有代码块内
if trimmed.startsWith("#"): continue # 标题行
if 表格行("|" 开头 and "|" 结尾): continue # 表格
if trimmed.startsWith("$$"): # LaTeX 公式块
跳过到下一个 "$$" # 防止 $$ 被误识别为 bash $
# ============ 路径 A:连续缩进块检测 ============
if isIndentedLine(line): # 4+ 空格 或 tab 开头
blockStart = i
codeLineCount = 0
codeFeatureCount = 0
while j < lines.size:
if isIndentedLine(lines[j]):
codeLineCount++
if hasCodeFeature(lines[j]):
codeFeatureCount++
elif lines[j].empty(): pass # 空行不打断
else: break
j++
# ⭐ 高置信度判定:≥3 缩进行 + 至少 1 行有代码特征
if codeLineCount >= 3 and codeFeatureCount >= 1:
lang = guessLanguage(lines[blockStart..j-1])
输出 "```" + lang
for k in blockStart..j:
输出 stripLeadingIndent(lines[k]) # 去 4 空格 / tab 缩进
输出 "```"
i = j
# ============ 路径 B:命令行模式检测 ============
if 匹配 CODE_COMMAND_PATTERN:
# 行首匹配:$ / >>> / ... / pip / npm / apt / yum / docker / kubectl / curl / wget / git / mvn / gradle
收集连续命令行(空行不打断)
# ⭐ 高置信度判定:连续 ≥ 2 行同模式
if cmdLines >= 2:
lang = (">>>" 开头) ? "python" : "bash"
包裹 ```lang ... ```
# ============ 关键 helper ============
function isIndentedLine(line):
return line.length > 4 and (line.startsWith(" ") or line.startsWith("\t"))
function hasCodeFeature(trimmed):
# 满足任一:
return 匹配 CODE_KEYWORD_PATTERN # def/class/function/import/from/var/const/let
# SELECT/INSERT/CREATE/DROP
# public/private/protected/return
# if(/for(/while(/try{/catch(
or 匹配 CODE_SYNTAX_PATTERN # 行尾是 [{}();]
or 含 "=" / "->" / "::"
or 行首 "@"(注解) / "//"(注释) / "/*"
function guessLanguage(blockText):
# 扫描内容关键字推测语言
含 "def / import / print(" → "python"
含 "function / const / =>" → "javascript"
含 "public / private / class" → "java"
含 "SELECT / INSERT / FROM" → "sql"
含 "server { / location" → "nginx"
含 "apiVersion: / kind:" → "yaml"
含 "\"name\": / \"version\":" → "json"
含 "FROM AND RUN" → "dockerfile"
其他 → ""(空,通用)
# ============ 工程价值 ============
# ⭐ 防止代码片段被切分破坏:切分时 SegmentParser 看到 ``` 会保护代码块
# ⭐ 防止代码片段被 LLM 错"理解":LLM 清洗时 CODE_BLOCK 段不动
# ⭐ guessLanguage 自动加 lang → 渲染时有语法高亮
# ⭐ 保守策略避免误标:文档里偶尔贴的指令、变量名不会被错框为代码块
# ============ 入口:LineProcessor 一组规则 + LlmMergeJudge 兜底 ============
# ① mergeTruncatedHeadings:截断标题合并
function mergeTruncatedHeadings(text):
for i, line
in lines:
if line
.startsWith(
"#"):
找下一个 # 行(跳过最多 3 个空行)
if 同级
and 当前不以
"。?!;:))" 结尾
and 下一行 <
20 字符
and 下一行不是新标题:
if 当前 >
15 字符:
# ⭐ 高置信度 → 直接合并
result
.add(
"#" * level + " " + 当前 + 下一行)
跳过下一行
else:
# ⭐ 低置信度(可能是两个独立短标题)→ 收集 UncertainBreak,扔 LLM
uncertainBreaks
.add({
lineIndex: result.size,
prevContext: 当前末
30 字,
nextContext: 下一行
})
# ② removeFalseEmptyLines:删 PDF 转换产生的虚假空行
function removeFalseEmptyLines(text):
for i, line
in lines:
if line.
empty()
and 前后都非空:
if 前不以终结标点结尾
and 前后都是中文/CJK标点:
continue # 删掉这个空行
# 否则保留(段落分隔的真空行)
# ③ mergeLineBreaks:主合并(三态决策)
function mergeLineBreaks(text):
current = lines[
0]
for i, line
in lines[
1:]:
decision =
decideMerge(current, line)
# 返回 MERGE / SKIP / UNCERTAIN 三态
switch decision:
case MERGE:
# 高置信度合并
current = current + line
# 直接拼接
case SKIP:
# 高置信度不合并
result
.add(current)
current = line
case UNCERTAIN:
# ⭐ 不确定 → 暂不合并,记下来扔 LLM
result
.add(current)
current = line
uncertainBreaks
.add({
lineIndex: result.size -
1,
prevContext: current 末
30 字,
nextContext: line 首
30 字
})
# ============ ⭐ 编排器:在 LlmDocumentCleaner Step 1.3 ============
function 编排():
# 1. 规则阶段:先收集所有 uncertainBreaks
allUncertain = []
for seg
in TEXT 段:
seg, breaks = LineProcessor
.mergeTruncatedHeadings(seg)
allUncertain
.addAll(breaks)
seg = LineProcessor
.removeFalseEmptyLines(seg)
seg, breaks = LineProcessor
.mergeLineBreaks(seg)
allUncertain
.addAll(breaks)
seg = LineProcessor
.splitClauseParagraphs(seg)
# 条款拆分
# 2. LLM 阶段:批量判断所有不确定的
if allUncertain 非空
and llmConfigured:
try:
llmResults = LlmMergeJudge
.judge(allUncertain)
# ⭐
for seg
in TEXT 段:
seg = LineProcessor
.applyLlmMergeResults(seg, allUncertain, llmResults)
except Exception:
# ⭐ LLM 失败降级:用规则宽松合并
seg = LineProcessor
.ruleBasedLineBreakMerge(seg)
# ============ ⭐⭐ LlmMergeJudge.judge:核心 LLM 调用 ============
function judge(breaks):
# 一次 batch prompt,列出所有不确定点
prompt =
"""
以下是文档中的断行位置,每组给出断行前 30 字和断行后 30 字。
请判断每处断行是否是 PDF/Word 转换产生的错误断行(应该合并为一句话)。
只回答 Y(应合并)或 N(不合并),每行一个,不要解释。
1. 前文:「」
后文:「」
2. 前文:「...」
后文:「...」
...
"""
response = llmClient
.chat(prompt,
"doc_clean_merge_judge")
# 解析 LLM 输出:每行 Y/N
answers = response
.split(
"[\\n,;]+")
results = []
for i, answer
in answers:
cleaned = answer
.upper()
.replaceAll(
"[^YN]",
"")
results
.add(cleaned
.startsWith(
"Y"))
# Y → 合并, N → 不合并
# 异常兜底:全部返回 false(保守:不合并)
return results
# ============ 这种"两阶段"架构的工程价值 ============
# ⭐ 高置信度的(行尾在中文中间、句号结尾)规则秒判,不浪费 LLM
# ⭐ 低置信度的(短标题、专业名词截断)才调 LLM,准确度极高
# ⭐ 一次 batch 把所有不确定的丢 LLM,N 次单调用变 1 次
# ⭐ LLM 挂了走宽松规则降级,不会卡住整个清洗
# ⭐ Prompt 简单粗暴只要 Y/N → 单条 token 极少,batch 规模可大
function processPptSlides(convertResult, fileName):
slides = convertResult.slides # 含每页元数据 SlideMeta(tables/images/charts/textLength)
taskId = convertResult.taskId
# ============ 阶段 1+2:并发下载截图 + VLM 识别 ============
线程池 = newFixedThreadPool(min(5, slides.size)) # daemon worker
pageResults[total]
for slide in slides 并发:
# 2.1 下载截图 + 持久化到本地
png = restTemplate.exchange("/slide/{taskId}/{imageName}")
imageProcessService.saveImageToLocal(taskId, slide.imageName, png)
base64 = Base64.encode(png)
# 2.2 ⭐ 按页面元数据动态选模型
meta = slide.meta
isComplex = (meta == null) or
meta.tables > 0 or meta.images > 0 or meta.charts > 0
useLite = (not isComplex) and llmConfig.isVisionLiteConfigured()
model = useLite ? llmConfig.visionLiteModel : llmConfig.visionModel # lite 还是 72B
# 2.3 重试 + 429 限流处理(最多 3 次)
for attempt in 1..3:
try:
result = llmClient.visionChatWithUsage(
prompt = LlmPromptConstants.PPT_PAGE_RECOGNITION_PROMPT,
image = base64, model = model)
if result.success: break
sleep(attempt × 3秒)
except TooManyRequests:
sleep(15 + random(10) 秒)
attempt-- # 限流不计入重试次数
# 2.4 ⭐ lite 全失败 → 回退 72B 再试一次
if useLite and result 仍失败:
result = llmClient.visionChatWithUsage(model = visionModel)
# 2.5 异步记录 token 用量(成本核算)
tokenUsageService.recordAsync(result, "ppt_vision_extract", ...)
pageResults[idx] = result.content
SSE.推进度("已完成 N/M 页")
CompletableFuture.allOf(futures).join()
# ============ 阶段 3:结构化整编 ============
# 3.1 下载 PPT 内嵌的原始图片 (slide{n}_img{m}.png 模式)
slideImageMap = {}
for imgName in convertResult.imageNames 匹配 "slide(\d+)_img\d+\..*":
slideImageMap[slideNum].add(imgName)
downloadAndSave(imgName)
# 3.2 过滤无效页
for i, content in pageResults:
if 空 or content == "[导航页]": continue
# 提取标题:首行 "## " 或 "# " 开头
title = 首行 if 首行.startsWith("#") else ""
body = 剩余正文
# 清理 PPT 页脚噪声(物芯页脚、Vision DT 标识、孤立数字、"### 页脚信息")
body = cleanPptPageBody(body)
# ⭐ 正文去空白 < 80 字符 → 视为导航/过渡页,丢弃
if len(body.replaceAll("\\s+", "")) < 80: continue
validPages.add({ idx: i, title, body })
# 3.3 ⭐ 按"标题核心主题"分组相邻页
function extractPptCoreTopic(title):
# 去掉 " - 副标题"、去掉末尾(括号备注)
core = title.split(" - ")[-1].replaceAll("[((].*?[))]$", "")
return core
function isSimilarPptTitle(t1, t2):
c1, c2 = extractPptCoreTopic(t1), extractPptCoreTopic(t2)
return c1 == c2 or c1.startsWith(c2) or c2.startsWith(c1)
# 滑动窗口找分组边界
groupStart = 0
for v in 1..validPages.size:
if v == validPages.size or
not isSimilarPptTitle(validPages[groupStart].title, validPages[v].title):
groupRanges.add([groupStart, v])
groupStart = v
# 3.4 拼接 markdown(每组用 ## 主题,贪心打包到 1500 字 chunk)
for [from, to] in groupRanges:
groupTitle = extractPptCoreTopic(validPages[from].title) or "第N页"
# 每页拼一段:截图 + 正文 + 嵌入的原始图片
pageSegments = []
for v in from..to:
seg = "\n\n" + body + "\n\n"
seg += slideImageMap[slideNum] 拼接的 
pageSegments.add(seg)
# 贪心打包:按 MAX_PPT_CHUNK_CHARS = 1500 拆分
subChunks = packPptSegments(pageSegments, max=1500)
for chunk in subChunks:
cleanedMd += "## " + groupTitle + "\n\n" + chunk
# 3.5 超长单段 → 按"段内最高级标题边界"拆分(splitAtTopHeading)
# 检测正文中出现的最高级 # 标题层级 → 沿该层级标题切
cleanCacheService.save(fileName, cleanedMd, "ppt_vision")
return cleanedMd
function processPdfVlm(convertResult, fileName):
pages = convertResult.slides
taskId = convertResult.taskId
# ============ 阶段 1+2:并发下载 + VLM 识别 ============
线程池 = ContextAwareExecutor.wrap(newFixedThreadPool(min(5, total)))
# ⭐ 透传 UserContext(并发执行下也能拿到当前用户/租户)
for page in pages 并发:
png = restTemplate.exchange("/slide/{taskId}/{imageName}")
imageProcessService.saveImageToLocal(taskId, page.imageName, png)
base64 = Base64.encode(png)
# ⭐ PDF 全部用 72B 模型(信息密度高,不用 lite)
model = llmConfig.visionModel
# 重试 + 429 限流(同 PPT)
for attempt in 1..3:
try:
result = llmClient.visionChatWithUsage(
prompt = LlmPromptConstants.PDF_PAGE_RECOGNITION_PROMPT,
image = base64, model = model)
if result.success: break
sleep(attempt × 3秒)
except TooManyRequests:
sleep(15 + random(10) 秒)
attempt--
tokenUsageService.recordAsync(result, "pdf_vlm_extract")
pageResults[idx] = result.content
SSE.推进度("已完成 N/M 页")
# ============ 跨页去重 ============
deduplicateAdjacentPages(pageResults, total)
# 去除完全相同的相邻段落(VLM 偶尔会把页眉页脚识别成正文)
# ============ ⭐ 页缝修复(repairPageBoundaries)============
function repairPageBoundaries(pageResults, total):
# 找出所有相邻有效页对(都不是跳过页)
boundaryPairs = []
for i in 0..total-1:
if 都非空 and 都不是 "[目录页]"/"[非正文页]"/"[导航页]":
boundaryPairs.add([i, i+1])
# 5 线程并发处理每个边界
for [idxA, idxB] in boundaryPairs 并发:
tail = pageResults[A] 末 5 行
head = pageResults[B] 头 5 行
# ⭐ 快速预判:已断句结尾 → 直接跳过(省 LLM 调用)
if A末行 以 "。/?/!/|" 结尾 or 空:
continue
# 调 LLM 判断这两行是否需要合并
userMsg = "【上页末尾】" + tail + "\n\n【下页开头】" + head
result = llmClient.chatWithUsage(
prompt = LlmPromptConstants.PAGE_BOUNDARY_REPAIR_PROMPT,
userMsg = userMsg)
# LLM 输出 JSON: { merge: bool, tail: 修正版末行, head: 修正版首行 }
json = parse(result.content)
if not json.merge: continue
# synchronized 写回(避免并发改 pageResults 数组)
synchronized(pageResults):
pageResults[A] 末行 = json.tail
pageResults[B] 首行 = json.head
# ============ 后处理 ============
postProcessPages(pageResults)
# 去除连续相同段落(跨页未捕获到的兜底)
# ============ 拼接 + 清理伪标题 ============
rawMd = ""
for page in pageResults:
if page in ["[目录页]", "[非正文页]", "[导航页]"]: continue
page = cleanPageHeadFalseHeadings(page) # VLM 易把页首句子误标为 ##
rawMd += page + "\n\n"
rawMd = postProcessFinalMarkdown(rawMd) # 整篇统一后处理
# ============ ⭐ LLM 文档清洗(分子步骤,SSE 推进度)============
# 走 LlmDocumentCleaner.cleanWithProgress() — 内部包含:
# - 规则预处理(详见 rule-preprocess 弹窗)
# - LLM 辅助清洗(标题断行修复、句段拼回、噪声段识别)
finalMd = llmDocumentCleaner.cleanWithProgress(
rawMd, fileName, CleanSourceType.PDF_VLM,
progressCallback = SSE 推子步骤)
cleanCacheService.save(fileName, finalMd, "pdf_vlm")
return finalMd
function ruleBasedPreprocess(markdown, sourceType):
lines = markdown.split("\n", -1)
result = []
for line in lines:
trimmed = CleanerUtils.stripUnicode(line)
# ① 清理不可见字符 \f
trimmed = INVISIBLE_CHARS.replace("")
# ② 删页码行(7 种正则,任一匹配整行丢弃)
if 匹配 PAGE_NUM_DOLLAR ("^\\$-\\s*\\d+\\s*-\\$\\s*$"): continue # $- 12 -$
if 匹配 PAGE_NUM_DASH ("^—\\s*\\d+\\s*—\\s*$"): continue # — 5 —
if 匹配 PAGE_NUM_BARE_DASH ("^-\\s*\\d+\\s*-\\s*$"): continue # - 12 -
if 匹配 PAGE_NUM_CHINESE ("^第\\s*\\d+\\s*页\\s*$"): continue # 第 12 页
if 匹配 PAGE_NUM_ENGLISH ("^Page\\s+\\d+\\s+of\\s+\\d+"): continue # Page 5 of 20
if 匹配 PAGE_NUM_STANDALONE("^\\d{1,3}\\s*$"): continue # 纯数字行
if 匹配 PAGE_NUM_DOT ("^[·•.]\\s*\\d+\\s*[·•.]"): continue # ·5·
# ③ 删噪声行(版权/出版/装订符号等,_NOISE_PATTERNS 共 30+ 条)
if CleanerUtils.isNoiseLine(trimmed): continue
if 匹配 "^\\*{1,3}\\s*\\*{1,3}$": continue # 装饰符号 ** **
# ④ HTML 实体解码
trimmed = trimmed.replace("&","&").replace("<","<").replace(">",">")
# ⑤ 半角括号转全角:(一) → (一)(只对中文数字)
trimmed = HALF_WIDTH_PAREN.replaceAll("($1)")
# ⑥ LaTeX 数学符号残留清理:$x^2$ → x²
trimmed = LatexCleaner.LATEX_DISPLAY.replace("$1")
trimmed = LatexCleaner.LATEX_INLINE.replace("$1")
trimmed = LatexCleaner.cleanLatex(trimmed)
trimmed = LatexCleaner.fixSpacedDigits(trimmed) # 修 OCR 数字间距
# ⑦ 修复 OCR 小数编号点号前后空格
"3.0 .2" → "3.0.2" (DECIMAL_SPACE_BEFORE_DOT)
"5.0. 2" → "5.0.2" (DECIMAL_SPACE_AFTER_DOT)
# ⑧ ⭐ CJK 标题字间距修复(仅对 # 开头的标题行)
if trimmed.startsWith("#"):
"# 水 库 调 度" → "# 水库调度"
(SPACED_CJK_SEGMENT,匹配"单中文字符+空格"段去掉空格)
# ⑨ 表/图编号前多余空格
"表 3-1" → "表3-1" (TABLE_FIG_SPACE)
# ⑩ 清理空图片描述块
trimmed = EMPTY_IMAGE_DESC.replace("") # 【图片描述】 → 删
trimmed = EMPTY_IMAGE_REF.replace("") #  → 删
# ⑪ ⭐ 仅 PDF/VLM 来源:OCR 系统性纠错 + 单位修复
if sourceType.isPdf():
trimmed = OcrCorrector.correctOcr(trimmed) # 常见 OCR 错字纠正
trimmed = OcrCorrector.fixUnits(trimmed) # 单位格式(km/h, m³ 等)
# ⑫ 行尾内联页码清理
trimmed = INLINE_PAGE_NUM_TAIL.replace("")
# ⑬ ⭐ 仅 VLM:重复幻觉修复(VLM 偶尔会重复输出同一段落)
if sourceType.isVlm():
trimmed = VLM_REPEAT.replace("$1") # 匹配 (.{4,50}?)\1{2,}
result.add(trimmed)
# ⑭ ⭐ 删除连续重复行(仅 VLM)
if sourceType.isVlm():
deduped = []
for t in result:
if deduped and t.trim() == deduped[-1].trim():
continue
deduped.add(t)
result = deduped
# ⑮ 合并连续空行 \n{3,} → \n\n
return String.join("\n", result).replaceAll(MULTI_BLANK_LINES, "\n\n")
# ============ ⭐ 主流程:SemanticHeadingGenerator.generate ============
function generate(text, baseLevel, fileName):
# baseLevel:无标题文档=H2,超长段落补标题=父标题+1
if text == null or text.length < 200: return null # 短文本不处理
# ① 按句切分(全角中文标点)
sentences = splitSentences(text) # 按 。? ! ; 切
if sentences.size < 3: return null # 句子太少不处理
# ② ⭐ 一次 batch embedding(失败 → return null,由 LlmDocumentCleaner 降级)
try:
embeddings = llmClient.batchEmbed(sentences)
except Exception:
log.warn("embedding调用失败,降级")
return null # ⛔ 关键:挂了让上层降级
if embeddings.size != sentences.size: return null
# ③ ⭐ 算"相邻"句子的余弦相似度(找语义跳变)
sims = []
for i in 0..embeddings.size-1:
sims.add(cosine(embeddings[i], embeddings[i+1])) # sims[i] = sim(句i, 句i+1)
# ④ ⭐⭐ P20 百分位阈值 → 相似度低于此 = 话题边界(跳变剧烈处)
boundaries = findBoundaries(sims, BOUNDARY_PERCENTILE = 20)
# 内部:
# threshold = percentile(sims, 20) # 取相似度的 20 分位数作为阈值
# for i, sim in sims:
# if sim < threshold: # 越低 = 上下两句语义差距越大
# boundaries.add(i + 1) # 在第 i+1 句处划分
if boundaries.empty: return null # 没找到跳变,不强行加标题
log.info("语义分块: %d句, %d边界, sim=[%.3f, %.3f], P20=%.3f")
# ⑤ 按边界把句子列表分段
segments = splitByBoundaries(sentences, boundaries)
# ⑥ ⭐ 碎片合并:< 100 字符的段合并到前一段(避免分得太碎)
segments = mergeFragments(segments, FRAGMENT_THRESHOLD = 100)
# 第一段如果太碎,反过来合并到第二段
if segments.size < 2: return null
# ⑦ ⭐ LLM 给每段命名(每段调一次)
prefix = "#" × baseLevel + " " # H2 → "## "
result = ""
for i, segment in segments:
title = generateTitle(segment)
# 内部:只送前 300 字给 LLM(省 token)
# prompt = "请为以下文本生成一个简短的中文标题(10字以内),只输出标题,不要解释:\n\n" + segment[:300]
# 清理 LLM 输出的引号 / "标题:" 前缀
# 失败兜底:返回 "未命名段落"
result += prefix + title + "\n\n" + segment + "\n\n"
return result
# ============ 调度入口:LlmDocumentCleaner ============
# 检测到无标题文档 / 超长段落:
#
# 优先:SemanticHeadingGenerator.generate() ⭐ 主流程(就是上面)
# 返回 null(embedding 不可用)
# ↓
# 降级:SubtitleGenerator.generateHeadingStructure() / generateSubTitles()
# 给段落编号 [1] [2] [3]... 让 LLM 一次性返回 JSON [{para, title, level}]
# 不依赖 embedding,纯 LLM 完成
# ============ 关键参数 ============
BOUNDARY_PERCENTILE = 20 # 相似度 P20 阈值找话题边界
FRAGMENT_THRESHOLD = 100 # 段长 < 100 字符 → 合并到前一段
TITLE_PREVIEW_CHARS = 300 # 只送前 300 字给 LLM 命名(省 token)
TITLE_MAX_CHARS = 10 # LLM 标题 ≤ 10 字
# ============ 句切分细节 ============
function splitSentences(text):
# 中文全角标点:。 ? ! ;(。 ? ! ;)
for char in text:
current.append(char)
if char in ['。', '?', '!', ';'] (全角):
sentences.add(current)
current.clear()
# 末尾余下的也算一句
# ============ 关键洞察 ============
# ⭐ 跟 ② 切分 sibling-sim 的区别:
# sibling-sim 算的是"标题节点的兄弟"之间相似度,做拆/合决策
# 本算法算的是"相邻句子"之间相似度,在跳变处插入新标题
# 用途完全不同:一个判段落归属,一个生成新标题
# ============ Step 0:已有多级 # heading 处理(智能选择路径)============
function detectAndFormat(text):
existingLevels = 扫描所有 # 标记的标题
if existingLevels.size >= 2 and totalExisting >= 5:
if 文档含 DECIMAL 编号(X.X / X.X.X / X.X.X.X):
# MinerU 的 text_level 经常标反 → 剥除编号行的 # 重新检测
剥除编号行的 # 标记,继续走完整流程
else:
# 信任已有结构,只压缩层级(若 H1/H3/H5 → H1/H2/H3)
return remapExistingHeadings(...)
# ============ Step 1:scanCandidates 扫描候选 ============
candidates = scanCandidates(lines)
# 14 种 PatternType,按优先级匹配:
# DECIMAL_4 → ^\d+\.\d+\.\d+\.\d+ (3.1.2.5)
# DECIMAL_3 → ^\d+\.\d+\.\d+ (3.1.2)
# DECIMAL_2 → ^\d+\.\d+ (3.1)
# CHAPTER → ^第[一二...\d]+[章部编篇] (第三章)
# SECTION → ^第[一二...\d]+节 (第三节)
# ARTICLE → ^第[一二...\d]+条 (第十二条)
# CHINESE_NUM → ^[一二...十]+[、.] (一、)
# PAREN_CHINESE → ^[((][一二...十]+[))] ((一))
# ROMAN_UPPER/LOWER → I. / i.
# ALPHA_DECIMAL → ^[A-Z]\.\d+ (A.1)
# UPPER_ALPHA / LOWER_ALPHA → A. / a.
# DIGIT_DOT → ^\d+[.、 ] (1. / 1 总则)
# DIGIT_PAREN → ^\d+\) (1))
# PAREN_DIGIT → ^[((]\d+[))] ((1))
# CIRCLE_DIGIT → ^[①②③④...] (①)
# 跳过:代码块/表格/注释标记(//、--、**注、<!--)/列表项前缀
# ============ Step 2:filterHeadingTypes 哪些类型是标题 ============
headingTypes = {}
for ptype, cands in 按类型分组(candidates):
if ptype in DECIMAL_HIERARCHY: # 小数编号天然层级,直接保留
headingTypes.add(ptype); continue
if ptype in STRONG_PATTERNS: # ⭐ 章/节/条:出现 1 次也保留
headingTypes.add(ptype); continue
if 某候选已有 # 标记: # MinerU/上游已确认
headingTypes.add(ptype); continue
if count == 1 and 后面 20 行内有嵌套子级:
headingTypes.add(ptype); continue
if count >= 2: # ⭐ 弱模式:行间距判定
adjacentCount = 紧邻数(line gap == 1)
if adjacentCount > gapCount/2 and count >= 3:
# 看起来像列表项,但有 2 个例外:
if 跨多个独立区段(被其他类型分隔) >= 2:
headingTypes.add(ptype); continue # 是子标题
if ptype == DIGIT_DOT and 有对应 DECIMAL 子标题:
headingTypes.add(ptype); continue # 是章节号(1 总则 → 1.1 ...)
continue # 列表项,不当标题
headingTypes.add(ptype)
# ============ Step 3:⭐⭐ assignLevels 分段栈推导层级 ============
# 这才是用户原话「扫到一、xxx 直到二、xxx,中间是更细层级」的真实实现
# Phase 1:确定 H1 类型
h1Type = 第一个出现次数 >= 2 的类型
if 第一个候选的类型 != h1Type and 它后面 10 行内有嵌套:
h1Type = 第一个候选的类型 # 它才是真 H1
# Phase 2:分段推导(核心算法)
sectionMap = {} # 类型 → 已分配层级
sectionStack = [] # 当前活跃的子类型栈
deepestDecimal = 0
for c in filtered:
if c.type in DECIMAL_HIERARCHY:
# DECIMAL 类型全局固定:X.X=H2, X.X.X=H3, X.X.X.X=H4
if c.type == h1Type:
level = 1
elif h1IsDecimal:
level = 1 + DECIMAL_HIERARCHY[c.type] - DECIMAL_HIERARCHY[h1Type]
else:
level = 2 + DECIMAL_HIERARCHY[c.type]
c.level = level
continue
if c.type == h1Type:
# ⭐ 遇到 H1 类型 → 重置栈(每个 H1 章节内独立分配子层级)
sectionMap.clear()
sectionStack.clear()
c.level = 1
else:
if c.type not in sectionMap:
# 新类型:depth = 当前栈深 + 2(H1 之下从 H2 开始)
depth = sectionStack.size + 2
depth = max(depth, deepestDecimal + 1)
sectionMap[c.type] = depth
sectionStack.add(c.type)
else:
# 已知类型:回退到该类型在栈中的位置,清掉之后的
idx = sectionStack.indexOf(c.type)
sectionStack[idx+1:].clear()
c.level = sectionMap[c.type]
# ============ ⭐ 用户原话「扫到一、xxx 中间更细层级」的具体例子 ============
#
# 「一、总则」 → CHINESE_NUM,选为 h1Type → H1, sectionStack=[]
# 「(一) 适用对象」 → PAREN_CHINESE,新类型 → depth=2 → H2, sectionStack=[PAREN_CHINESE]
# 「(1) 自然人」 → PAREN_DIGIT,新类型 → depth=3 → H3, sectionStack=[PC, PD]
# 「① 16-60周岁」 → CIRCLE_DIGIT,新类型 → depth=4 → H4, sectionStack=[PC, PD, CD]
#
# ===== 进入「二、组织架构」 =====
#
# 「二、组织架构」 → CHINESE_NUM == h1Type → ⭐ 重置 sectionStack/sectionMap → H1
# 「(一) 岗位」 → PAREN_CHINESE,栈空,depth=2 → H2(跟第一节的"适用对象"层级一致 ✓)
#
# ===== 同一编号在不同文档可能不同层级 =====
#
# 文档 A:「(一)」 后面没有更细的子级 → h1Type 可能就是 PAREN_CHINESE → "(一)" = H1
# 文档 B:「一、」 是 H1, "(一)" 嵌套在下面 → "(一)" = H2
# → 没有"全局映射",每篇文档独立推导自己的层级树 ✓
# ============ 跟 Python 端 md_builder.py 的关系 ============
# Python 端 md_builder.py 是 MinerU 后端用的辅助修复,
# 跟 Java 端 NumberingSchemeDetector 是两个独立实现。
# Python 端逻辑较简单(用 last_num_level 状态机 + 8 个全局映射),
# Java 端是完整的"自适应层级树推导"(14 种 pattern + 强弱模式 + 列表项过滤 + 分段栈)。
# ============ 第 1 道:handleOversizeChunks 超长拆分 ============
# 阈值:> 1.2 × maxChunkSize 才拆(给点弹性,700 字对 512 不拆,1536 才拆)
# overlap:15% maxChunkSize(防语义断裂)
# tinyTail:1/5 maxChunkSize(末段太短折回上一段)
function handleOversizeChunks(chunks, maxSize, includePath, enableOverlap):
splitThreshold = maxSize × 1.2
tinyTail = maxSize / 5
for chunk in chunks:
if chunk.length <= splitThreshold: 直接保留; continue
# ⭐ 表格作为原子单位:连续 | 行合成一段
paragraphs = 按 \n\n 切,但连续 | 行合并为一个 table 段
for para in paragraphs:
# 单表 > 阈值:走 splitLargeTable(按行拆,每段保留表头)
if para 是表格 and para.length > splitThreshold:
splitLargeTable(para, maxSize, pathPrefix, parts)
continue
# 普通段累加,超 maxSize 就开新段
if current + para > maxSize:
提交 current 为新 chunk
if includePath: 新段开头加「【路径】(续)\n\n」
if enableOverlap:
# ⭐ overlap 句子边界对齐(extractOverlapTail)
取上段末 15% maxSize → 找第一个句末标点 [。;!?\n] 截断 → 拼到新段开头
current = overlap(若有)
current += para
# ⭐ 碎尾折回:最后一段 < 1/5 maxSize → 合并到倒数第二段(避免产生碎尾)
if parts[-1].length < tinyTail:
parts[-2] += parts[-1]; remove parts[-1]
# ============ 第 2 道:mergeSiblingChunks 同父碎片合并 ============
# 阈值:< 200 字符(SIBLING_MERGE_FRAGMENT_THRESHOLD)
# 约束:必须同父(getParentPath 取标题路径去掉最后一段)
function mergeSiblingChunks(chunks, maxChunkSize):
for i, current in chunks:
if current.length >= 200: # 非碎片直接保留
result.add(current); continue
currentParent = getParentPath(current.path)
# ⭐ 优先向下合并(碎片作为下段引言更自然)
if chunks[i+1] 同父 and 合并后 <= maxChunkSize:
chunks[i+1].chunk = current.chunk + "\n\n" + chunks[i+1].chunk
continue
# 向下不行尝试向上合并(到 result 末尾的同父 chunk)
if result[-1] 同父 and 合并后 <= maxChunkSize:
result[-1].chunk += "\n\n" + current.chunk
continue
result.add(current) # 都不行就独立保留
# ⭐ 注:语义合并由 ChunkPlanService(embedding 兄弟相似度)在 plan 阶段已完成
# 这里只处理 plan 产生的剩余碎片(标题行、短前言等)
# ============ 第 3 道:mergeSmallChunks 跨父小段合并(基于关键词相似度)============
# 阈值:pureTextLength < 80 字符(FRAGMENT_BODY_THRESHOLD)
# pureTextLength 排除:图片引用行、# 标题、【路径】前缀、空行
function mergeSmallChunks(chunks, includePath):
# ① 标记每个 chunk 是否为碎片
isFragment[i] = pureTextLength(chunks[i]) < 80
# ② ⭐⭐ 关键算法:基于 bigram 关键词的 Jaccard 相似度
# 决定每个碎片是向前合还是向后合
for i, frag in 碎片:
fragKw = ChunkTextUtils.extractBigramKeywords(frag.content)
prevTail = 找前一个非碎片 chunk,取末 200 字
nextHead = 找后一个非碎片 chunk,取头 200 字
prevSim = jaccard(fragKw, extractBigram(prevTail))
nextSim = jaccard(fragKw, extractBigram(nextHead))
# ⭐ 合到相似度更高的那个;相同则向下(引言更自然)
mergeTarget[i] = (prevSim > nextSim) ? prevIdx : nextIdx
# ③ 执行合并:吸收方按时序排,前面的碎片当前缀,后面的当后缀
for base, fragments in absorbMap:
for fi in fragments where fi < baseIdx:
base.chunk = chunks[fi].chunk + "\n\n" + base.chunk # 前缀
for fi in fragments where fi > baseIdx:
base.chunk += "\n\n" + chunks[fi].chunk # 后缀
# ④ 兜底:全是碎片时,合成一个
# ============ 第 4 道:filterNonChineseChunks 中文文档过滤英文段 ============
# 阈值:中文字符占比 < 10%(CHINESE_RATIO_THRESHOLD)→ 过滤
# 用途:中文文档常混入英文目录、英文翻译、版权声明、ISBN 等无检索价值的段
function filterNonChineseChunks(chunks):
for chunk in chunks:
ratio = chineseCharRatio(chunk.content)
# ⭐ 排除空白和标点,只对"可见非标点字符"算占比
# chinese = U+4E00..9FFF 或 U+3400..4DBF
if ratio >= 0.10: result.add(chunk)
# 否则丢弃
# ============ 4 道精修在 chunkMarkdown 主流程的调用顺序 ============
# chunks = mergeToChunks(sections) # plan 切分
# chunks = mergeSiblingChunks(chunks, maxChunkSize) # 第 1 道
# chunks = handleOversizeChunks(chunks, maxChunkSize, ...) # 第 2 道
# chunks = mergeSmallChunks(chunks, includePath) # 第 3 道
# chunks = filterNonChineseChunks(chunks) # 第 4 道
# tocChunk = tocChunkService.generateTocChunk(...) # 加目录块
# ============ 主入口 chunkMarkdown(content, fileName, request) ============
function chunkMarkdown(content, fileName, request):
# ① 抽文档元数据(标题、发布单位、文号、日期)
extractResult = extractDocumentMetadata(content)
metadata = extractResult.metadata
processedContent = extractResult.remainingContent
# ② 抽尾部元数据(落款、抄送、印发日期),从正文剥离
processedContent = extractDocumentFooter(processedContent, metadata)
# ③ 按 # 标题切成 Section 树
sections = parseMarkdownStructure(processedContent)
# ④ 主切分:按 splitLevel 切 / 或按 ChunkPlanService 自适应 plan 切
chunks = mergeToChunks(sections, request)
# ↓ 内部:
# for section in sections:
# if section.level <= splitLevel: # 大于等于的标题作为切点
# 收当前 chunk → 起新 chunk
# else: # 更深层级的标题
# 合并到当前 chunk(保留 # 标题前缀)
# ============ ⭐ 4 道精修 (chunkFormatService) ============
# ⑤ 合并语义相近的兄弟 chunk
chunks = chunkFormatService.mergeSiblingChunks(chunks, maxChunkSize)
# ⑥ 处理超长 chunk:按更深层标题再拆,带路径(includePath=true)
chunks = chunkFormatService.handleOversizeChunks(chunks, maxChunkSize, includePath, true)
# ⑦ 合并过小 chunk(防止零碎片段)
chunks = chunkFormatService.mergeSmallChunks(chunks, includePath)
# ⑧ 过滤非中文 chunk(英文目录、纯符号、纯数字行 等)
chunks = chunkFormatService.filterNonChineseChunks(chunks)
# ============ ⑨ ⭐ 加目录概览块(非 PPT 文档)============
isPptOrigin = fileName 以 .pptx/.ppt 结尾 or 匹配 *_cleaned.md
if not isPptOrigin:
tocChunk = tocChunkService.generateTocChunk(sections, metadata)
# tocChunk 内容 = 标题列表 + 元数据,用于回答"这文档讲什么/有哪些章节"等全局问题
if tocChunk != null:
chunks.insert(0, tocChunk) # 插到首位
重排 chunkId
# ============ ⑩ 输出 ============
stats = buildStats(chunks, sections) # H1/H2/H3 数、avg/max/min 长度、含图块数
return ChunkResultVO(chunks, stats, metadata)
# ============ ChunkVO 元数据 (供检索过滤) ============
function buildChunkVO(section, index, includePath):
# 每个 chunk 的元数据:
chunk.path = "H1 / H2 / H3" # 标题路径,RAGFlow 检索可按 path 前缀过滤
chunk.level = section.level
chunk.title = section.title
chunk.images = [...] # 该 chunk 含的图片引用
chunk.chunk = (if includePath: H1/H2/H3 \n\n) + 内容
function generateSplitPlan(sections, maxChunkSize, contentHash):
# ① 预计算:每个 section 的 mergedSize(自身 + 所有后代)+ 直接子节数
for i in sections.indexes:
parentLevel = sections[i].level
total = sections[i].content.length
for j in i+1..end:
if sections[j].level <= parentLevel: break # 出了子树范围
if sections[j].level == parentLevel + 1: childCounts[i]++
total += sections[j].title.length + sections[j].content.length
mergedSizes[i] = total
# ② ⭐ 核心:计算同级兄弟两两 embedding 相似度
siblingSimCache = computeSiblingEmbeddingSimilarity(sections)
# 内部展开:
# 2.1 按父级分组(parentToSiblings),只处理"组内 ≥ 2 个兄弟"的
# 2.2 收集所有要 embed 的文本:每段 title + content[:200] (内容只取前 200 字)
# 2.3 ⭐ 一次 batch:vectors = llmClient.batchEmbed(embedTexts)
# → 一次 HTTP 调用拿全部向量(避免 N+1)
# 2.4 ⭐ 失败兜底:catch Exception → return {} → 后续走规则判断
# 2.5 给每组兄弟算两两 cosine:
# result[父级:兄弟a索引] = [cosine(va, v1), cosine(va, v2), ...] 数组
# ③ ⭐ 决策每个 section:拆 or 合
nodes = []
for i, s in sections:
if s.level == 0 or (i == 0 and s.title.empty()):
# 文档前言
split = not s.content.empty()
reason = "文档引言" or "空内容"
elif s.level <= 1:
# ⭐ 一级标题:独立章节,始终拆(永不合并)
split = true
reason = "一级标题,独立章节"
else:
# 二级及以下:默认拆,满足条件才合
parentIdx = findParentIndex(sections, i)
canMerge = parentIdx >= 0 and mergedSizes[parentIdx] <= maxChunkSize
sim = getSiblingSimForIndex(siblingSimCache, sections, i)
if not canMerge:
split = true
reason = "合并超限,独立分段"
elif sim >= 0 and sim > getSiblingSimMedian(...):
# ⭐ 与兄弟相似度高于"组内中位数"→ 语义相关,合并到父级
split = false
reason = "语义相关(0.78),合并到父级"
elif sim >= 0:
# 低于中位数 → 主题不同,独立分段
split = true
reason = "语义差异(0.42),独立分段"
else:
# ⭐ embedding 挂了 / 没兄弟 → 规则兜底
siblings = findSiblingIndices(sections, i)
if len(siblings) <= 2 and canMerge:
split = false; reason = "子节少且不超限,合并"
else:
split = true; reason = "默认独立分段"
nodes.add(PlanNode(i, title, level, charCount, childCount, split, reason))
# ④ ⭐ 自动推荐 maxChunkSize(基于最小粒度标题的平均字数,前端建议值)
suggestedMaxChunkSize = suggestMaxChunkSize(sections)
return SplitPlanVO(nodes, stats {
totalSections, splitCount, mergeCount,
estimatedChunks, estimatedAvgSize,
suggestedMaxChunkSize
})
function parseSegments(markdown):
lines = markdown.split("\n", -1)
segments = []
current = DocSegment(TEXT)
inCode = false
for line in lines:
trimmed = line.trim()
# ============ ① 代码块围栏 ``` 处理 ============
if trimmed.startsWith("```"):
if inCode:
# 出代码块:把围栏行加进 current,提交,起新 TEXT 段
current.lines.add(line)
segments.add(current)
current = DocSegment(TEXT)
inCode = false
else:
# 入代码块:先提交之前的 TEXT 段,起 CODE_BLOCK 段
if not current.lines.empty(): segments.add(current)
current = DocSegment(CODE_BLOCK)
current.lines.add(line)
inCode = true
continue
if inCode:
current.lines.add(line) # ⭐ 代码块内不解析任何东西
continue
# ============ ② 表格行判定:| 开头 且 | 结尾 ============
isTable = not trimmed.empty()
and trimmed[0] == '|'
and trimmed[trimmed.length - 1] == '|'
if isTable:
if current.type != TABLE:
# 进入表格:提交之前的段,起 TABLE 段
if not current.lines.empty(): segments.add(current)
current = DocSegment(TABLE)
current.lines.add(line)
else:
if current.type == TABLE:
# 离开表格:提交,起新 TEXT
segments.add(current)
current = DocSegment(TEXT)
current.lines.add(line)
if not current.lines.empty(): segments.add(current)
return segments
# ============ 实际使用位置:LlmDocumentCleaner.java:71 ============
# 在 ① 清洗阶段最后一步「LLM 文档清洗」(PDF VLM 管线尾部走它)调用:
#
# String prepared = HtmlTableConverter.convertHtmlTables(markdown)
# List<DocSegment> segments = SegmentParser.parseSegments(prepared)
#
# for seg in segments:
# if seg.type == TEXT:
# seg = TextPreprocessor.ruleBasedPreprocess(seg, sourceType) # 规则清洗
# seg = LatexCleaner.wrapResidualMathAsCodeBlock(seg) # 残留 LaTeX 兜底
# elif seg.type == TABLE:
# 只做 LaTeX 清理(MinerU 表格 \mathrm{m} 残留多)
# elif seg.type == CODE_BLOCK:
# 不动(保护)
#
# String final = SegmentParser.reassembleSegments(segments) # 100% 还原原顺序
# ⭐ 这种"100% 准确的规则判定"在 RAG 工程里常见:
# 能用确定性规则解决的,绝不依赖 LLM 或向量(快、稳、零成本)
# ============ 入口:processTask 异步并发 ============
function processTask(context):
context.status = "running"
context.message = "正在并行生成 QA(LLM_CONCURRENCY 路并发)..."
chunks = context.chunks
# ⭐ 用 ContextAwareExecutor.wrap 的池子,自动透传 UserContext
futures = []
for i, chunk in enumerate(chunks):
future = CompletableFuture.runAsync(() => {
if context.cancelled: return # 取消传播
prevChunk = (i > 0) ? chunks[i-1].chunk : null
try:
result = generateSingleChunkQA(chunk, context.docTitle, prevChunk)
if result != null and result.question1 != null:
context.results[chunk.chunkId] = result
context.completed.incrementAndGet()
else:
context.failed.incrementAndGet()
except Exception:
context.failed.incrementAndGet() # 异常隔离:单 future 失败不影响其他
notifyProgress(context) # 推进度回调
}, llmExecutor)
futures.add(future)
CompletableFuture.allOf(futures).join()
# 全部完成后判定状态
if context.failed == context.total:
context.status = "failed"
elif context.cancelled:
context.status = "cancelled"
else:
context.status = "completed"
# ============ 单 chunk QA 生成 ============
function generateSingleChunkQA(chunk, docTitle, prevChunkContent):
# ⭐ 拼 userMsg:文档标题 + 章节路径 + 上一段摘要(前 200 字)+ 当前 chunk 内容
# 上下文越完整,LLM 出的问题越贴合文档结构
userMsg = ""
userMsg += "文档标题:" + docTitle + "\n"
userMsg += "所属章节:" + chunk.path + "\n" # 来自 chunk 的 H1/H2/H3 path
userMsg += "上一段摘要:" + prevChunkContent[:200] + "\n"
userMsg += "\n文档片段内容:\n" + chunk.chunk
# ⭐ 调 LLM,关键:cachedChatWithUsage(7 天 Redis 缓存)
# cache key = MD5(prompt + userMsg) → 同 chunk 重复出 QA 直接读缓存
result = llmClient.cachedChatWithUsage(
prompt = LlmPromptConstants.QA_GENERATION_PROMPT,
userMsg = userMsg,
cacheNs = "qa",
cacheTTL = Duration.ofDays(7))
tokenUsageService.recordAsync(result, LlmCallType.QA_GENERATE,
cacheHit = result.isCacheHit()) # 命中缓存的不计入 token 成本
# ⭐ 解析 LLM 响应,LLM 可能输出 ```json {...} ``` 或纯 JSON
response = result.content
jsonStr = extractJson(response) # 剥围栏
json = parseObject(jsonStr)
question1 = json.getString("question1")
question2 = json.getString("question2") # ⭐ 每个 chunk 生成 2 个问题
return ChunkWithQA(chunkId, chunk, question1, question2)
# ============ 落库(走 QAMappingServiceImpl.saveMappings)============
function saveMappings(chunkId, sourceId, documentId, questions):
for q in questions: # [question1, question2]
questionHash = MD5(q.bytes(UTF-8)) # 唯一索引去重
qaMapping.save(QaMapping{
chunkId, sourceId, documentId,
question = q, questionHash = hash
})
# ⭐ 分批保存,大批量数据避免 OOM(BatchUtils.processBatch)
# 这些 (问题, 预期chunk) 对就是 GridSearch 评估的"标准答案"
# ============ 阶段 1:构造检索组合(笛卡尔积)============
function buildRetrievalCombos(config):
combos = []
for th in config.similarityThresholds: # 例: [0.2, 0.4, 0.6]
for w in config.vectorWeights: # 例: [0.3, 0.5, 0.7]
for k in config.topKs: # 例: [3, 5, 10]
for rm in config.rerankModels: # 例: ["", "bge-reranker"]
combos.add(RetrievalCombo(th, w, k, rm))
return combos # 总数 = 阈值数 × 权重数 × topK数 × rerank数
# ============ 阶段 2:批量检索 + 评估(双层池架构)============
function batchRetrieveAndEvaluate(ctx, mappings, retrievalCombos, ...):
# ⭐ 关键去重:阈值/topK 是后置过滤,只有 (weight, rerankModel) 影响检索 API 调用
# 收集唯一 (weight, rerankModel) 对 → 大幅减少 RAGFlow 调用次数
uniqueWR = {}
for rc in retrievalCombos:
key = rc.vectorWeight + "_" + rc.rerankModel
uniqueWR[key] = WeightRerankPair(rc.vectorWeight, rc.rerankModel)
wrPairs = uniqueWR.values
# ⭐ 双层线程池:防止"外层占用内层导致死锁"(166 客户实战)
comboPool = newFixedThreadPool(min(3, wrPairs.size)) # 外层:调度 (weight,rerank)
retrievePool = newFixedThreadPool(ragFlowConfig.retrieveConcurrency) # 内层:配置默认 8
# 都用 ContextAwareExecutor.wrap 自动透传 UserContext
# ⭐⭐ 双层缓存:Redis 持久化(跨任务复用)+ 内存 ConcurrentHashMap(本次任务派生评测)
retrieveCache = ConcurrentHashMap() # 内存缓存:qIdx + weight + rerankModel → results
for wrPair in wrPairs (外层 comboPool 并发):
if ctx.cancelled: return
for qIdx, mapping in mappings (内层 retrievePool 并发):
if ctx.cancelled: return
redisKey = buildRetrieveCacheKey(datasetIds, mapping.questionHash, weight, rerank)
# ① 先查 Redis 缓存(24h TTL,跨任务复用同样的 Q + 参数)
results = redisService.get(redisKey)
if results 命中:
ctx.cacheHits.incrementAndGet()
else:
# ② Redis 未命中 → 调 RAGFlow API(带重试)
results = doRetrieveWithRetry(
question = mapping.question,
datasets = ctx.datasetIds, # 含正样本数据集 + 噪声数据集
weight = wrPair.weight,
rerankModel = wrPair.rerankModel,
topK = 1024, # maxTopK
similarity = 0.0 # minThreshold
)
# ③ 写 Redis 缓存(TTL 24h)
redisService.set(redisKey, results, Duration.ofHours(24))
retrieveCache[qIdx + "_" + weight + "_" + rerank] = results
# ⭐ 每组合超时:max(300s, mappings × 2s),避免单组合卡死整个任务
timeoutPerCombo = max(300, mappings.size × 2)
CompletableFuture.allOf(qaFutures).get(timeoutPerCombo, SECONDS)
ctx.completedWeightReranks.incrementAndGet()
notifyProgress()
# ⭐ 总超时:max(600s, wrPairs × mappings × 2s)
totalTimeout = max(600, wrPairs.size × mappings.size × 2)
CompletableFuture.allOf(comboFutures).get(totalTimeout, SECONDS)
comboPool.shutdown()
debugLogRetrievalDiagnosis(...) # 缓存命中率 / 检索失败统计
# ============ 阶段 2.5:从缓存派生所有参数组合的评测结果 ============
for rc in retrievalCombos:
if ctx.cancelled: break
comboResult = evalService.evaluateComboFromCache(
mappings, retrieveCache, rc.threshold, rc.weight, rc.topK, rc.rerankModel)
allResults.add(comboResult)
ctx.completedCombos.incrementAndGet()
if 完成数 % 20 == 0: notifyProgress()
saveIncrementalResults(ctx, allResults) # 增量入库,中断可恢复
# ============ 检索带重试的关键 helper ============
function doRetrieveWithRetry(question, datasets, ...):
try:
return ragFlowService.retrieve(...)
except SocketTimeout 或 5xx:
sleep(3 秒); retry 1 次
except 4xx 或 ConnectException:
raise # 不重试(4xx 是参数错,ConnectException 是 RAGFlow 不可达)
# ============ 异常处理(单 QA 失败不连带,整组合不中断)============
# SocketTimeout → 日志 + 该 QA 结果置空
# ConnectException → 日志 + 该 QA 结果置空
# HttpServerError → 同上
# HttpClientError 4xx → 同上(参数错就跳过该 QA)
# Exception → 同上
# → 单 QA 失败不影响其他,整批跑完
# ============ 阶段 3:多维 tiebreaker 选最优 ============
function pickBest(allResults):
# ⛔ 不加 tiebreaker → 多个并列时 stream.max 返回任意一个,跟 UI 排序对不上
# 166 客户实战教训!以下排序规则跟前端 utils/gridSearchRecommend.js 严格对齐
return allResults.sortBy(comparator(a, b):
# ① recall 高优先
if a.recallAtK != b.recallAtK: return b.recallAtK - a.recallAtK
# ② mrr 高优先
if a.mrr != b.mrr: return b.mrr - a.mrr
# ③ 无 rerank 优先(简单胜复杂)
if a.hasRerank != b.hasRerank: return a.hasRerank ? 1 : -1
# ④ 阈值高优先(更严格更稳)
if a.threshold != b.threshold: return b.threshold - a.threshold
# ⑤ topK 小优先(噪声更少)
if a.topK != b.topK: return a.topK - b.topK
# ⑥ 向量权重接近 0.5 优先(向量/关键词均衡)
return abs(a.weight-0.5) - abs(b.weight-0.5)
).first()
# ============ 阶段 4:saveResults 落库 + 中断恢复 ============
# - 任务状态: running → completed / cancelled / interrupted / failed
# - 服务重启时 PostConstruct 钩子标记残留 running 任务为 interrupted
# - resumeTask 可基于 completedComboKeys 跳过已完成组合,只跑 remainingCombos
function evaluateComboFromCache(mappings, cache, similarityThreshold, vectorWeight, topK, rerankModel):
recallSum = 0.0 # 命中累加
hitAt1Count = 0 # 首位命中数
mrrSum = 0.0 # 平均倒数排名累加
for qi, mapping in enumerate(mappings):
# ① 从缓存取该 question 的全量结果(用 maxTopK + 0 阈值检索过的)
cacheKey = qi + "_" + vectorWeight + "_" + rerankModel
cachedResults = cache.getOrDefault(cacheKey, [])
# ② 后置过滤:按 similarity 阈值过 + 截 topK
filtered = cachedResults
.filter(r => r.similarity != null and r.similarity >= similarityThreshold)
.limit(topK)
# ③ 取预期 chunk 内容,规范化空白(\s+ → 单空格)
expectedContent = normalizeWhitespace(mapping.chunkContent)
if expectedContent.length < 10:
continue # ⭐ 太短的 chunk 不参与评估(噪声大)
# ④ 看预期 chunk 是否在候选里(文本包含匹配,二值判定)
hit = false
rank = -1
for i, c in enumerate(filtered):
chunkContent = normalizeWhitespace(c.content)
if isContentMatch(expectedContent, chunkContent):
hit = true
rank = i + 1
break
# ⑤ 累加 3 项指标
if hit:
recallSum += 1.0
mrrSum += 1.0 / rank
if rank == 1:
hitAt1Count++
total = mappings.size()
return ComboResult(
similarityThreshold = similarityThreshold,
vectorWeight = vectorWeight,
topK = topK,
rerankModel = rerankModel,
recallAtK = recallSum / total, # 召回率(命中率)
hitAt1 = hitAt1Count / total, # 首位命中率
mrr = mrrSum / total # 平均倒数排名
)
# ============ ⭐ 内容匹配的关键 helper(三档判定)============
function normalizeWhitespace(text):
if text == null: return ""
return text.replaceAll("\\s+", "") # ⭐ 删除所有空白(不是合并)
# 真实实现是替换成空字符串。chunk 之间空白格式可能千差万别,直接全删做对比
function isContentMatch(expected, actual):
if expected == null or actual == null: return false
# 第 1 档:完全相等
if expected.equals(actual): return true
# 第 2 档:双向包含(预期是候选的子串,或反之)
if expected.contains(actual) or actual.contains(expected):
return true
# ⭐ 第 3 档:前缀匹配度 > 80%(仅当较短一边 > 50 字符时启用)
# 应对 chunk 切分边界差异:同一段被不同 chunk 切到不同位置,前缀大概率一致
minLen = min(expected.length, actual.length)
if minLen > 50:
matchLen = 0
for i in 0..minLen:
if expected[i] == actual[i]: matchLen++
else: break
if matchLen / minLen > 0.8: return true
return false
# ============ 入口:全局 running 锁 ============
function startEval(sourceIds, faqDatasetIds, sampleSize, threshold, vw, topN, maxTokens, modelName):
SseEmitter emitter = new SseEmitter(0L) # 永不超时,业务侧自己控
# 同一时间只允许一个评测任务,15 分钟超时强制释放(防死锁)
if running.get():
if 已经运行 > 15分钟:
running.set(false) # 强制释放
else:
sendError("已有评测在运行")
return emitter
running.compareAndSet(false, true)
executor.submit(() => doEval(emitter, ...))
emitter.onTimeout / onCompletion / onError(() => running.set(false))
return emitter
# ============ 主评测流程 doEval ============
function doEval(emitter, sourceIds, faqDatasetIds, sampleSize, ...):
# ① 加载 FAQ + 收集 RAGFlow 数据集 ID
for sourceId in sourceIds:
ds = dataSourceService.getById(sourceId)
ragflowDatasetIds.add(ds.ragflowDatasetId)
allFaqs.addAll(faqItemMapper.selectByDatasetIds(faqDatasetIds))
# ② 持久化 task(running 状态)+ 抽样快照(M3 复现用)
dbTask = evalTaskMapper.insert(GenerationEvalTask{ status: "running", ... })
samples = stratifiedSample(allFaqs, sampleSize) # 分层抽样
dbTask.evalItemsSnapshot = JSON.stringify(samples) # M3 快照
# ③ ⭐ 创建临时 ChatAssistant(在 RAGFlow 上)
# 把检索参数 (threshold/weight/topN/maxTokens/model) 配进去
dto.prompt.similarityThreshold = threshold
dto.prompt.keywordsSimilarityWeight = 1.0 - vectorWeight # ⛔ RAGFlow API 用关键词权重,要转换!
dto.prompt.topN = topN
dto.prompt.topK = 1024
dto.prompt.system = LlmPromptConstants.EVAL_ASSISTANT_SYSTEM_PROMPT
dto.llm.modelName = modelName
dto.llm.temperature = 0.1 # 低温降随机性
assistantId = chatAssistantService.createAssistant(dto)
# ============ ④ 阶段 5:并发对话(3 路并发,120s 超时)============
chatPool = ContextAwareExecutor.wrap(newFixedThreadPool(3))
for qa in samples:
chatPool.submit(() => {
try:
# ⭐ 每条 FAQ 独立 session,避免对话历史干扰评测
session = chatAssistantService.createSession(assistantId, "eval_threadId")
# ⭐ 单任务 120s 超时
chatFuture = CompletableFuture.supplyAsync(() =>
chatAssistantService.chat(assistantId, session.id, qa.question)
)
response = chatFuture.get(120, SECONDS)
# ⭐ 清除 RAGFlow 引用标记 [ID:0] [ID:1](干扰 RAGAS 评分)
answer = response.answer.replaceAll("\\s*\\[ID:\\d+\\]", "")
contexts = response.references.map(r => r.content)
if answer.isEmpty(): skippedCount++; return
# ⭐ M1 多答案:ground_truths = 主答案 + altAnswers
groundTruths = [qa.answer]
if qa.altAnswers:
groundTruths.addAll(parseJson(qa.altAnswers))
evalItems.add({faqId, question, answer, contexts, groundTruths, metadata})
except TimeoutException: skippedCount++ # 超时跳过
except Exception: skippedCount++ # 异常隔离
})
# ⭐ 定时器单线程推进度(2 秒一次)— 防止进度倒退,不在子任务里推
chatProgressScheduler.scheduleAtFixedRate(() => {
done = completedCount + skippedCount
progress = 15 + (done / totalSamples) × 60 # 15-75%
sendProgress(emitter, progress)
}, 2, 2, SECONDS)
chatPool.shutdown(); 同步等所有 future 完成
chatProgressScheduler.shutdown()
# ============ ⑤ 阶段 6:RAGAS 评分(3 路并发)============
ragasPool = ContextAwareExecutor.wrap(newFixedThreadPool(3))
# ⛔ 嵌套异步必须 wrap UserContext:ragasService 内部多次调 LLM
# → tokenUsageService 需要拿 user_id 做计费
ragasFutures = evalItems.map(item =>
CompletableFuture.runAsync(() => {
try:
score = ragasService.evaluate(
question = item.question,
answer = item.answer,
contexts = item.contexts,
groundTruths = item.groundTruths)
allResults.add({
faqId, question, answer,
ground_truth = groundTruths[0], # 低分诊断用
contexts,
faithfulness = score.faithfulness, # 忠实度
answer_relevancy = score.answerRelevancy, # 答案相关性
answer_correctness = score.answerCorrectness, # 事实正确性
metadata = item.metadata # M2 维度聚合用
})
except Exception: ragasFail++
finally: ragasDone++
}, ragasPool)
)
CompletableFuture.allOf(ragasFutures).join()
# 同样的定时器推进度(78-98%)
ragasProgressScheduler.shutdown(); ragasPool.shutdown()
# ============ ⑥ 汇总 + 落库 ============
avgFaithfulness = allResults.avg(r => r.faithfulness)
avgRelevancy = allResults.avg(r => r.answer_relevancy)
avgCorrectness = allResults.avg(r => r.answer_correctness)
# ⭐ 低分样本(任一维度 < 0.6)→ 用于诊断
lowScoreItems = allResults.filter(r =>
r.faithfulness<0.6 or r.answer_relevancy<0.6 or r.answer_correctness<0.6)
# ⭐ M2 按 metadata 维度聚合(difficulty / question_type 各自的指标)
metadataBreakdown = buildMetadataBreakdown(allResults)
dbTask.update({status: "completed", ...})
sendProgress(emitter, "done", ..., 100)
# ============ ⭐⭐ ragasService.evaluate 内部:RagasJavaService 真实实现 ============
# 注意:不是 Python RAGAS,是 Java 重写版(基于 RAGAS 论文算法)
# 三个指标无依赖 → CompletableFuture 并行计算,各 120s 超时
function evaluate(question, answer, contexts, groundTruths):
# 切到 ragasModel(可与主 LLM 不同,通常用更便宜/快的模型)
fFuture = async(evaluateFaithfulness(question, answer, contexts))
rFuture = async(evaluateAnswerRelevancy(question, answer))
cFuture = async(evaluateAnswerCorrectnessMulti(question, answer, groundTruths))
return EvalScore(fFuture.get(120s), rFuture.get(120s), cFuture.get(120s))
# ============ 指标 1:faithfulness 忠实度(陈述拆解 + NLI 验证) ============
function evaluateFaithfulness(question, answer, contexts):
# Step 1:LLM 把 answer 拆解为独立事实陈述
statements = extractStatements(question, answer) # 调 RAGAS_STATEMENT_EXTRACTION prompt
# 例:answer="水库 X 库容 1 亿,2020 年建成"
# → statements = ["水库 X 库容 1 亿", "水库 X 2020 年建成"]
# Step 2:LLM NLI 验证 — 逐条判断每个陈述能否从 contexts 推导
prompt = RAGAS_NLI_VERIFICATION
.replace("{context}", contexts.join("\n\n"))
.replace("{statements}", JSON(statements))
response = chat(prompt)
verdicts = parseJsonArray(response) # [{verdict: 0或1, reason: "..."}]
supported = verdicts.count(v => v.verdict == 1)
# Step 3:得分 = 支持数 / 总陈述数
return supported / statements.size # 0.0 ~ 1.0
# ============ 指标 2:answerRelevancy 答案相关性(LLM-as-a-Judge) ============
function evaluateAnswerRelevancy(question, answer):
# 算法:替代 RAGAS 原版"反推问题法"(成本高),改用 LLM 直接打 1-10 分
prompt = RAGAS_ANSWER_RELEVANCY
.replace("{question}", question)
.replace("{answer}", answer)
response = chat(prompt)
score = parseJsonObject(response).score # LLM 输出 {score: 8}
return clamp(score / 10.0, 0.0, 1.0) # 归一化到 [0, 1]
# ============ 指标 3:answerCorrectness 事实正确性(F1 分数) ============
function evaluateAnswerCorrectnessMulti(question, answer, groundTruths):
# ⭐ 多 ground_truth:遍历各算分,取最高(语义等价的多种表达,命中一种即满分)
best = 0.0
for gt in groundTruths:
score = evaluateAnswerCorrectness(question, answer, gt)
if score > best: best = score
if best >= 0.99: break # 满分提前退出,省 LLM
return best
function evaluateAnswerCorrectness(question, answer, groundTruth):
# Step 1:answer 和 groundTruth 各自拆陈述
answerStmts = extractStatements(question, answer)
gtStmts = extractStatements(question, groundTruth)
# Step 2:LLM 分类 TP/FP/FN(经典分类指标)
# TP = 在 answer 也在 gt 的陈述(命中真值)
# FP = 在 answer 但不在 gt 的陈述(瞎编)
# FN = 在 gt 但不在 answer 的陈述(漏答)
classification = chat(RAGAS_CORRECTNESS_CLASSIFICATION
.replace("{answer_statements}", JSON(answerStmts))
.replace("{ground_truth_statements}", JSON(gtStmts)))
tp, fp, fn = classification.TP/FP/FN 数组长度
# Step 3:F1 = 2 × P × R / (P + R)
precision = tp / (tp + fp)
recall = tp / (tp + fn)
return 2 × precision × recall / (precision + recall)
# ============ 共用工具:extractStatements 陈述拆解 ============
function extractStatements(question, text):
# LLM 把一段文本拆成独立的事实陈述列表
response = chat(RAGAS_STATEMENT_EXTRACTION
.replace("{question}", question)
.replace("{answer}", text))
return parseJsonObject(response).statements # ["事实1", "事实2", ...]
# ============ 异常兜底 ============
# 三个指标任意一个抛异常 → 该指标返回 0.0,不影响其他两个
# 整体并行 evaluate() 异常 → 已完成的取真实值,未完成的返回 0.0