🎯 派洞察——科研文献智能 RAG 知识库系统
📁 文件上传模块
模块定位:文件上传模块是系统的核心功能模块,负责处理科研文献文件的上传、分片处理、类型验证和权限控制。
🎯 核心目标
- ✅ 高效上传:支持大文件分片上传,提供上传进度反馈
- ✅ 类型验证:严格的文件类型验证,确保上传文档符合系统要求
- ✅ 权限控制:基于组织标签和公开权限的文件访问控制
- ✅ 异步处理:通过 Kafka 消息队列实现文件的异步处理
- ✅ 数据安全:完整的权限验证和数据隔离机制
📊 一、功能需求
📤 文件分片上传
📝 功能描述:支持大文件的分片上传,客户端可将大文件分割成多个分片依次上传,服务端负责分片的接收和存储。
⚡ 进度反馈:实时返回已上传分片列表和上传进度百分比。
🔍 类型验证:在第一个分片上传时进行文件类型验证,确保文件类型符合系统要求。
📋 上传状态查询
🔍 状态查询:客户端可查询指定文件的上传状态,包括已上传分片列表、总进度、文件名和文件类型等信息。
📊 进度计算:服务端根据已上传分片数量和总分片数量计算上传进度百分比。
🔗 文件分片合并
🧩 分片合并:当所有分片上传完成后,客户端可触发文件合并操作,服务端将分片合并成完整文件。
🔒 权限验证:合并前验证用户对该文件的操作权限,确保只有文件所有者才能合并文件。
📋 完整性检查:检查所有分片是否已上传完成,未完成则拒绝合并请求。
🏷️ 组织标签与权限
🆔 自动标签:如未指定组织标签,自动使用用户的主组织标签(primaryOrg)。
🔓 公开权限:支持设置文件公开权限(isPublic),公开文件不受组织标签限制。
🛡️ 权限继承:文件处理任务包含完整的权限信息(用户ID、组织标签、公开权限)。
📄 文件类型验证
✅ 类型检查:系统仅支持特定类型的文档文件(如PDF、Word、TXT等)。
🚫 拒绝机制:不支持的文件类型将被拒绝上传,并返回详细的错误信息和支持的文件类型列表。
📋 扩展支持:提供接口获取系统支持的文件类型和扩展名列表。
🚀 异步任务处理
📨 Kafka集成:文件合并成功后,发送任务到 Kafka 消息队列进行异步处理。
🔄 事务保证:使用 Kafka 事务确保消息发送的可靠性。
📊 任务信息:任务包含文件MD5、对象URL、文件名、用户ID、组织标签和公开权限等完整信息。
🛠️ 二、技术方案
📤 分片上传技术
🎯 分片策略:客户端将大文件分割成固定大小的分片(如5MB),依次上传每个分片。
🗂️ 分片存储:服务端将分片临时存储,记录已上传分片索引。
📊 进度跟踪:维护分片上传状态,实时计算和返回上传进度。
🔐 身份认证与授权
🛡️ JWT认证:基于 JWT token 验证用户身份,确保上传操作的安全性。
👤 用户属性:通过 @RequestAttribute(“userId”) 获取当前用户ID,由拦截器提前解析。
🔒 权限控制:基于用户ID验证文件操作权限,防止越权访问。
📊 数据持久化
🗄️ MySQL:存储文件上传记录、文件元数据和权限信息。
⚡ Redis:缓存上传状态信息,提高查询性能。
🚀 Spring Data JPA:简化数据库操作,提供文件记录的CRUD操作。
🚀 消息队列集成
📨 Kafka配置:通过 KafkaConfig 获取文件处理主题名称。
🔄 事务处理:使用 kafkaTemplate.executeInTransaction 确保消息发送的事务性。
📋 任务模型:FileProcessingTask 包含文件处理所需的完整信息。
🧰 辅助工具
📊 性能监控:LogUtils.PerformanceMonitor 监控接口性能。
📝 业务日志:LogUtils.logBusiness 记录业务操作日志。
⚠️ 错误日志:LogUtils.logBusinessError 记录错误信息和异常。
📁 文件工具:getFileType 方法提取文件类型,支持常见文档格式。
📝 三、关键流程
📤 1. 文件分片上传
🔄 流程步骤
- 📨 接收请求:客户端发送分片上传请求,包含文件MD5、分片索引、文件信息等参数
- ✅ 类型验证:第一个分片进行文件类型验证,检查文件扩展名和内容类型
- 🏷️ 组织标签:如未指定orgTag,获取用户主组织标签
- 💾 分片存储:调用uploadService.uploadChunk存储分片数据
- 📊 状态查询:查询已上传分片列表和总分片数量
- 🧮 进度计算:计算并返回上传进度百分比
- 📤 返回结果:返回统一格式的响应数据
接口设计
请求URL
1
| POST /api/v1/upload/chunk
|
请求参数
1 2 3 4 5 6 7 8
| fileMd5: string // 文件的MD5值,用于唯一标识文件 chunkIndex: int // 分片索引,表示当前分片的位置 totalSize: long // 文件总大小 fileName: string // 文件名 totalChunks: int // 总分片数量(可选) orgTag: string // 组织标签,如果未指定则使用用户的主组织标签(可选) isPublic: boolean // 是否公开,默认为false(可选) file: MultipartFile // 分片文件对象
|
成功响应
1 2 3 4 5 6 7 8
| { "code": 200, "message": "分片上传成功", "data": { "uploaded": [0, 1, 2], "progress": 60.0 } }
|
失败响应
1 2 3 4 5 6
| { "code": 400, "message": "不支持的文件类型: .exe", "fileType": "EXECUTABLE", "supportedTypes": ["PDF", "DOCX", "TXT"] }
|
📊 2. 获取文件上传状态
🔄 流程步骤
- 🔍 接收参数:获取文件MD5和用户ID参数
- 📋 查询信息:查询文件上传记录和已上传分片信息
- 🧮 计算进度:根据已上传分片计算上传进度
- 📤 返回状态:返回文件状态、进度和基本信息
接口设计
请求URL
1
| GET /api/v1/upload/status?file_md5={fileMd5}
|
成功响应
1 2 3 4 5 6 7 8 9 10
| { "code": 200, "message": "获取上传状态成功", "data": { "uploaded": [0, 1, 2, 3, 4], "progress": 100.0, "fileName": "research_paper.pdf", "fileType": "PDF" } }
|
失败响应
1 2 3 4
| { "code": 500, "message": "获取文件上传状态失败: 数据库连接异常" }
|
🔗 3. 合并文件分片
🔄 流程步骤
- 📨 接收请求:接收文件合并请求,包含文件MD5和文件名
- 🔒 权限验证:验证用户对该文件的操作权限
- 📋 完整性检查:检查所有分片是否已上传完成
- 🧩 文件合并:调用uploadService.mergeChunks合并分片
- 📨 发送任务:构建FileProcessingTask并发送到Kafka
- 📤 返回结果:返回合并成功和文件访问URL
接口设计
请求URL
1
| POST /api/v1/upload/merge
|
请求体(JSON)
1 2 3 4
| { "fileMd5": "abc123def456", "fileName": "research_paper.pdf" }
|
成功响应
1 2 3 4 5 6 7
| { "code": 200, "message": "文件合并成功,任务已发送到 Kafka", "data": { "object_url": "https://storage.example.com/files/abc123def456.pdf" } }
|
失败响应
1 2 3 4
| { "code": 403, "message": "没有权限操作此文件" }
|
📋 4. 获取支持的文件类型
🔄 流程步骤
- 📋 查询类型:调用fileTypeValidationService获取支持的文件类型
- 📊 构建数据:构建包含文件类型、扩展名和描述信息的响应数据
- 📤 返回结果:返回支持的文件类型列表
接口设计
请求URL
1
| GET /api/v1/upload/supported-types
|
成功响应
1 2 3 4 5 6 7 8 9
| { "code": 200, "message": "获取支持的文件类型成功", "data": { "supportedTypes": ["PDF", "DOCX", "DOC", "TXT", "PPTX"], "supportedExtensions": [".pdf", ".docx", ".doc", ".txt", ".pptx"], "description": "系统支持的文档类型文件,这些文件可以被解析并进行向量化处理" } }
|
失败响应
1 2 3 4
| { "code": 500, "message": "获取支持的文件类型失败" }
|
🗃️ 四、库表设计
📁 1. 文件上传记录表
字段设计
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| | 🔑 字段 | 📋 类型 | 📝 描述 | | --- | --- | --- | | `id` | `BIGINT` | 🆔 主键,文件上传记录 ID | | `file_md5` | `VARCHAR(32)` | 🔤 文件MD5值,唯一标识 | | `file_name` | `VARCHAR(255)` | 📄 原始文件名 | | `file_size` | `BIGINT` | 📊 文件大小(字节) | | `file_type` | `VARCHAR(50)` | 📋 文件类型 | | `user_id` | `VARCHAR(50)` | 👤 上传用户ID | | `org_tag` | `VARCHAR(50)` | 🏷️ 组织标签 | | `is_public` | `BOOLEAN` | 🌍 是否公开 | | `chunk_count` | `INT` | 🧩 总分片数量 | | `uploaded_chunks` | `TEXT` | 📋 已上传分片索引(JSON数组) | | `status` | `ENUM` | 📊 上传状态 | | `object_url` | `VARCHAR(500)` | 🔗 文件对象存储URL | | `created_at` | `TIMESTAMP` | 📅 创建时间 | | `updated_at` | `TIMESTAMP` | 🔄 更新时间 |
|
建表语句
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| CREATE TABLE file_uploads ( id BIGINT AUTO_INCREMENT PRIMARY KEY COMMENT '文件上传记录唯一标识', file_md5 VARCHAR(32) NOT NULL UNIQUE COMMENT '文件MD5值,唯一标识', file_name VARCHAR(255) NOT NULL COMMENT '原始文件名', file_size BIGINT NOT NULL COMMENT '文件大小(字节)', file_type VARCHAR(50) COMMENT '文件类型', user_id VARCHAR(50) NOT NULL COMMENT '上传用户ID', org_tag VARCHAR(50) COMMENT '组织标签', is_public BOOLEAN DEFAULT FALSE COMMENT '是否公开', chunk_count INT DEFAULT 1 COMMENT '总分片数量', uploaded_chunks TEXT COMMENT '已上传分片索引(JSON数组)', status ENUM('UPLOADING', 'COMPLETED', 'FAILED') DEFAULT 'UPLOADING' COMMENT '上传状态', object_url VARCHAR(500) COMMENT '文件对象存储URL', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', INDEX idx_file_md5 (file_md5) COMMENT '文件MD5索引', INDEX idx_user_id (user_id) COMMENT '用户ID索引', INDEX idx_status (status) COMMENT '状态索引' ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='文件上传记录表';
|
📄 2. 文件处理任务表
字段设计
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| | 🔑 字段 | 📋 类型 | 📝 描述 | | --- | --- | --- | | `task_id` | `VARCHAR(50)` | 🆔 主键,任务ID | | `file_md5` | `VARCHAR(32)` | 🔤 文件MD5值 | | `object_url` | `VARCHAR(500)` | 🔗 文件对象存储URL | | `file_name` | `VARCHAR(255)` | 📄 文件名 | | `user_id` | `VARCHAR(50)` | 👤 用户ID | | `org_tag` | `VARCHAR(50)` | 🏷️ 组织标签 | | `is_public` | `BOOLEAN` | 🌍 是否公开 | | `status` | `ENUM` | 📊 任务状态 | | `retry_count` | `INT` | 🔁 重试次数 | | `error_message` | `TEXT` | ⚠️ 错误信息 | | `created_at` | `TIMESTAMP` | 📅 创建时间 | | `updated_at` | `TIMESTAMP` | 🔄 更新时间 |
|
建表语句
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| CREATE TABLE file_processing_tasks ( task_id VARCHAR(50) PRIMARY KEY COMMENT '任务唯一标识', file_md5 VARCHAR(32) NOT NULL COMMENT '文件MD5值', object_url VARCHAR(500) NOT NULL COMMENT '文件对象存储URL', file_name VARCHAR(255) NOT NULL COMMENT '文件名', user_id VARCHAR(50) NOT NULL COMMENT '用户ID', org_tag VARCHAR(50) COMMENT '组织标签', is_public BOOLEAN DEFAULT FALSE COMMENT '是否公开', status ENUM('PENDING', 'PROCESSING', 'COMPLETED', 'FAILED') DEFAULT 'PENDING' COMMENT '任务状态', retry_count INT DEFAULT 0 COMMENT '重试次数', error_message TEXT COMMENT '错误信息', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', INDEX idx_file_md5 (file_md5) COMMENT '文件MD5索引', INDEX idx_user_id (user_id) COMMENT '用户ID索引', INDEX idx_status (status) COMMENT '状态索引' ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='文件处理任务表';
|