🎯 派洞察——科研文献智能 RAG 知识库系统

📁 文件上传模块

模块定位:文件上传模块是系统的核心功能模块,负责处理科研文献文件的上传、分片处理、类型验证和权限控制。

🎯 核心目标

  • 高效上传:支持大文件分片上传,提供上传进度反馈
  • 类型验证:严格的文件类型验证,确保上传文档符合系统要求
  • 权限控制:基于组织标签和公开权限的文件访问控制
  • 异步处理:通过 Kafka 消息队列实现文件的异步处理
  • 数据安全:完整的权限验证和数据隔离机制

📊 一、功能需求

📤 文件分片上传

📝 功能描述:支持大文件的分片上传,客户端可将大文件分割成多个分片依次上传,服务端负责分片的接收和存储。

进度反馈:实时返回已上传分片列表和上传进度百分比。

🔍 类型验证:在第一个分片上传时进行文件类型验证,确保文件类型符合系统要求。

📋 上传状态查询

🔍 状态查询:客户端可查询指定文件的上传状态,包括已上传分片列表、总进度、文件名和文件类型等信息。

📊 进度计算:服务端根据已上传分片数量和总分片数量计算上传进度百分比。

🔗 文件分片合并

🧩 分片合并:当所有分片上传完成后,客户端可触发文件合并操作,服务端将分片合并成完整文件。

🔒 权限验证:合并前验证用户对该文件的操作权限,确保只有文件所有者才能合并文件。

📋 完整性检查:检查所有分片是否已上传完成,未完成则拒绝合并请求。

🏷️ 组织标签与权限

🆔 自动标签:如未指定组织标签,自动使用用户的主组织标签(primaryOrg)。

🔓 公开权限:支持设置文件公开权限(isPublic),公开文件不受组织标签限制。

🛡️ 权限继承:文件处理任务包含完整的权限信息(用户ID、组织标签、公开权限)。

📄 文件类型验证

类型检查:系统仅支持特定类型的文档文件(如PDF、Word、TXT等)。

🚫 拒绝机制:不支持的文件类型将被拒绝上传,并返回详细的错误信息和支持的文件类型列表。

📋 扩展支持:提供接口获取系统支持的文件类型和扩展名列表。

🚀 异步任务处理

📨 Kafka集成:文件合并成功后,发送任务到 Kafka 消息队列进行异步处理。

🔄 事务保证:使用 Kafka 事务确保消息发送的可靠性。

📊 任务信息:任务包含文件MD5、对象URL、文件名、用户ID、组织标签和公开权限等完整信息。

🛠️ 二、技术方案

📤 分片上传技术

🎯 分片策略:客户端将大文件分割成固定大小的分片(如5MB),依次上传每个分片。

🗂️ 分片存储:服务端将分片临时存储,记录已上传分片索引。

📊 进度跟踪:维护分片上传状态,实时计算和返回上传进度。

🔐 身份认证与授权

🛡️ JWT认证:基于 JWT token 验证用户身份,确保上传操作的安全性。

👤 用户属性:通过 @RequestAttribute(“userId”) 获取当前用户ID,由拦截器提前解析。

🔒 权限控制:基于用户ID验证文件操作权限,防止越权访问。

📊 数据持久化

🗄️ MySQL:存储文件上传记录、文件元数据和权限信息。

Redis:缓存上传状态信息,提高查询性能。

🚀 Spring Data JPA:简化数据库操作,提供文件记录的CRUD操作。

🚀 消息队列集成

📨 Kafka配置:通过 KafkaConfig 获取文件处理主题名称。

🔄 事务处理:使用 kafkaTemplate.executeInTransaction 确保消息发送的事务性。

📋 任务模型:FileProcessingTask 包含文件处理所需的完整信息。

🧰 辅助工具

📊 性能监控:LogUtils.PerformanceMonitor 监控接口性能。

📝 业务日志:LogUtils.logBusiness 记录业务操作日志。

⚠️ 错误日志:LogUtils.logBusinessError 记录错误信息和异常。

📁 文件工具:getFileType 方法提取文件类型,支持常见文档格式。

📝 三、关键流程

📤 1. 文件分片上传

🔄 流程步骤

  1. 📨 接收请求:客户端发送分片上传请求,包含文件MD5、分片索引、文件信息等参数
  2. 类型验证:第一个分片进行文件类型验证,检查文件扩展名和内容类型
  3. 🏷️ 组织标签:如未指定orgTag,获取用户主组织标签
  4. 💾 分片存储:调用uploadService.uploadChunk存储分片数据
  5. 📊 状态查询:查询已上传分片列表和总分片数量
  6. 🧮 进度计算:计算并返回上传进度百分比
  7. 📤 返回结果:返回统一格式的响应数据

接口设计

请求URL

1
POST /api/v1/upload/chunk

请求参数

1
2
3
4
5
6
7
8
fileMd5: string          // 文件的MD5值,用于唯一标识文件
chunkIndex: int // 分片索引,表示当前分片的位置
totalSize: long // 文件总大小
fileName: string // 文件名
totalChunks: int // 总分片数量(可选)
orgTag: string // 组织标签,如果未指定则使用用户的主组织标签(可选)
isPublic: boolean // 是否公开,默认为false(可选)
file: MultipartFile // 分片文件对象

成功响应

1
2
3
4
5
6
7
8
{
"code": 200,
"message": "分片上传成功",
"data": {
"uploaded": [0, 1, 2],
"progress": 60.0
}
}

失败响应

1
2
3
4
5
6
{
"code": 400,
"message": "不支持的文件类型: .exe",
"fileType": "EXECUTABLE",
"supportedTypes": ["PDF", "DOCX", "TXT"]
}

📊 2. 获取文件上传状态

🔄 流程步骤

  1. 🔍 接收参数:获取文件MD5和用户ID参数
  2. 📋 查询信息:查询文件上传记录和已上传分片信息
  3. 🧮 计算进度:根据已上传分片计算上传进度
  4. 📤 返回状态:返回文件状态、进度和基本信息

接口设计

请求URL

1
GET /api/v1/upload/status?file_md5={fileMd5}

成功响应

1
2
3
4
5
6
7
8
9
10
{
"code": 200,
"message": "获取上传状态成功",
"data": {
"uploaded": [0, 1, 2, 3, 4],
"progress": 100.0,
"fileName": "research_paper.pdf",
"fileType": "PDF"
}
}

失败响应

1
2
3
4
{
"code": 500,
"message": "获取文件上传状态失败: 数据库连接异常"
}

🔗 3. 合并文件分片

🔄 流程步骤

  1. 📨 接收请求:接收文件合并请求,包含文件MD5和文件名
  2. 🔒 权限验证:验证用户对该文件的操作权限
  3. 📋 完整性检查:检查所有分片是否已上传完成
  4. 🧩 文件合并:调用uploadService.mergeChunks合并分片
  5. 📨 发送任务:构建FileProcessingTask并发送到Kafka
  6. 📤 返回结果:返回合并成功和文件访问URL

接口设计

请求URL

1
POST /api/v1/upload/merge

请求体(JSON)

1
2
3
4
{
"fileMd5": "abc123def456",
"fileName": "research_paper.pdf"
}

成功响应

1
2
3
4
5
6
7
{
"code": 200,
"message": "文件合并成功,任务已发送到 Kafka",
"data": {
"object_url": "https://storage.example.com/files/abc123def456.pdf"
}
}

失败响应

1
2
3
4
{
"code": 403,
"message": "没有权限操作此文件"
}

📋 4. 获取支持的文件类型

🔄 流程步骤

  1. 📋 查询类型:调用fileTypeValidationService获取支持的文件类型
  2. 📊 构建数据:构建包含文件类型、扩展名和描述信息的响应数据
  3. 📤 返回结果:返回支持的文件类型列表

接口设计

请求URL

1
GET /api/v1/upload/supported-types

成功响应

1
2
3
4
5
6
7
8
9
{
"code": 200,
"message": "获取支持的文件类型成功",
"data": {
"supportedTypes": ["PDF", "DOCX", "DOC", "TXT", "PPTX"],
"supportedExtensions": [".pdf", ".docx", ".doc", ".txt", ".pptx"],
"description": "系统支持的文档类型文件,这些文件可以被解析并进行向量化处理"
}
}

失败响应

1
2
3
4
{
"code": 500,
"message": "获取支持的文件类型失败"
}

🗃️ 四、库表设计

📁 1. 文件上传记录表

字段设计

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| 🔑 字段 | 📋 类型 | 📝 描述 |
| --- | --- | --- |
| `id` | `BIGINT` | 🆔 主键,文件上传记录 ID |
| `file_md5` | `VARCHAR(32)` | 🔤 文件MD5值,唯一标识 |
| `file_name` | `VARCHAR(255)` | 📄 原始文件名 |
| `file_size` | `BIGINT` | 📊 文件大小(字节) |
| `file_type` | `VARCHAR(50)` | 📋 文件类型 |
| `user_id` | `VARCHAR(50)` | 👤 上传用户ID |
| `org_tag` | `VARCHAR(50)` | 🏷️ 组织标签 |
| `is_public` | `BOOLEAN` | 🌍 是否公开 |
| `chunk_count` | `INT` | 🧩 总分片数量 |
| `uploaded_chunks` | `TEXT` | 📋 已上传分片索引(JSON数组) |
| `status` | `ENUM` | 📊 上传状态 |
| `object_url` | `VARCHAR(500)` | 🔗 文件对象存储URL |
| `created_at` | `TIMESTAMP` | 📅 创建时间 |
| `updated_at` | `TIMESTAMP` | 🔄 更新时间 |

建表语句

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
CREATE TABLE file_uploads (
id BIGINT AUTO_INCREMENT PRIMARY KEY COMMENT '文件上传记录唯一标识',
file_md5 VARCHAR(32) NOT NULL UNIQUE COMMENT '文件MD5值,唯一标识',
file_name VARCHAR(255) NOT NULL COMMENT '原始文件名',
file_size BIGINT NOT NULL COMMENT '文件大小(字节)',
file_type VARCHAR(50) COMMENT '文件类型',
user_id VARCHAR(50) NOT NULL COMMENT '上传用户ID',
org_tag VARCHAR(50) COMMENT '组织标签',
is_public BOOLEAN DEFAULT FALSE COMMENT '是否公开',
chunk_count INT DEFAULT 1 COMMENT '总分片数量',
uploaded_chunks TEXT COMMENT '已上传分片索引(JSON数组)',
status ENUM('UPLOADING', 'COMPLETED', 'FAILED') DEFAULT 'UPLOADING' COMMENT '上传状态',
object_url VARCHAR(500) COMMENT '文件对象存储URL',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
INDEX idx_file_md5 (file_md5) COMMENT '文件MD5索引',
INDEX idx_user_id (user_id) COMMENT '用户ID索引',
INDEX idx_status (status) COMMENT '状态索引'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='文件上传记录表';

📄 2. 文件处理任务表

字段设计

1
2
3
4
5
6
7
8
9
10
11
12
13
14
| 🔑 字段 | 📋 类型 | 📝 描述 |
| --- | --- | --- |
| `task_id` | `VARCHAR(50)` | 🆔 主键,任务ID |
| `file_md5` | `VARCHAR(32)` | 🔤 文件MD5值 |
| `object_url` | `VARCHAR(500)` | 🔗 文件对象存储URL |
| `file_name` | `VARCHAR(255)` | 📄 文件名 |
| `user_id` | `VARCHAR(50)` | 👤 用户ID |
| `org_tag` | `VARCHAR(50)` | 🏷️ 组织标签 |
| `is_public` | `BOOLEAN` | 🌍 是否公开 |
| `status` | `ENUM` | 📊 任务状态 |
| `retry_count` | `INT` | 🔁 重试次数 |
| `error_message` | `TEXT` | ⚠️ 错误信息 |
| `created_at` | `TIMESTAMP` | 📅 创建时间 |
| `updated_at` | `TIMESTAMP` | 🔄 更新时间 |

建表语句

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
CREATE TABLE file_processing_tasks (
task_id VARCHAR(50) PRIMARY KEY COMMENT '任务唯一标识',
file_md5 VARCHAR(32) NOT NULL COMMENT '文件MD5值',
object_url VARCHAR(500) NOT NULL COMMENT '文件对象存储URL',
file_name VARCHAR(255) NOT NULL COMMENT '文件名',
user_id VARCHAR(50) NOT NULL COMMENT '用户ID',
org_tag VARCHAR(50) COMMENT '组织标签',
is_public BOOLEAN DEFAULT FALSE COMMENT '是否公开',
status ENUM('PENDING', 'PROCESSING', 'COMPLETED', 'FAILED') DEFAULT 'PENDING' COMMENT '任务状态',
retry_count INT DEFAULT 0 COMMENT '重试次数',
error_message TEXT COMMENT '错误信息',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
INDEX idx_file_md5 (file_md5) COMMENT '文件MD5索引',
INDEX idx_user_id (user_id) COMMENT '用户ID索引',
INDEX idx_status (status) COMMENT '状态索引'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='文件处理任务表';