Python 实现在线视频播放完整方案:从后端服务到前端适配
一、技术选型
1. 核心技术栈
后端:Python 3.8+(稳定兼容)、FastAPI(高性能异步 Web 框架,处理并发请求)、Flask(可选,轻量易上手)、uvicorn(ASGI 服务器,支持异步)
前端:HTML5 Video 原生 API、Video.js(增强播放体验,支持 HLS/DASH)
存储方案:
本地存储(开发 / 测试环境):服务器磁盘直接存储
云存储(生产环境):阿里云 OSS、腾讯云 COS、AWS S3(高可用、支持 CDN 加速)
传输协议:
HTTP 分片传输(基础方案,支持断点续传)
HLS(HTTP Live Streaming,适配移动端 + 弱网环境)
辅助工具:FFmpeg(视频转码,统一格式)、Redis(可选,缓存视频元信息 / 上传状态)
2. 关键技术说明
FastAPI:异步非阻塞框架,性能接近 Node.js,原生支持数据校验和 OpenAPI 文档,处理大文件上传和流式传输效率更高。
Video.js:开源免费的跨浏览器播放器,支持 MP4、WebM、HLS(.m3u8)等格式,自带进度条、倍速、全屏等控件,可自定义皮肤和功能扩展。
分片传输:通过解析 HTTP Range 请求头,将大视频文件分割为小片段(如 2MB / 片),前端按需加载,避免一次性加载大文件导致的卡顿和超时。
视频转码:利用 FFmpeg 将原始视频(AVI、MKV、MOV 等)转为 MP4(兼容性最优)或 HLS 格式(.m3u8 索引文件 + ts 分片文件),解决不同设备和浏览器的格式兼容问题。
异步处理:视频上传和转码属于耗时操作,通过 Python 异步任务(如 Celery、FastAPI 背景任务)处理,避免阻塞主线程,提升系统并发能力。
二、后端实现(FastAPI + 异步处理)
1. 环境准备
安装 Python 3.8+,执行以下命令安装依赖:
bash运行pip install fastapi uvicorn python-multipart aiofiles ffmpeg-python redis # aiofiles:异步文件操作
安装 FFmpeg:
yum install ffmpeg(CentOS)、brew install ffmpeg(Mac)、官网下载(Windows)(可选)安装 Redis:用于缓存上传状态和视频元信息
2. 项目结构
python-video-player/ ├── app/ │ ├── __init__.py │ ├── main.py # 主程序(接口路由) │ ├── config.py # 配置文件(存储路径、端口等) │ ├── services/ # 业务逻辑(上传、播放、转码) │ │ ├── upload.py │ │ ├── play.py │ │ └── transcode.py │ └── utils/ # 工具函数(文件处理、权限验证) │ ├── file_utils.py │ └── auth_utils.py └── video-storage/ # 视频存储目录(自动创建)
3. 核心配置(config.py)
import os# 服务器配置HOST = "0.0.0.0"PORT = 8000# 视频存储配置BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))VIDEO_STORAGE_PATH = os.path.join(BASE_DIR, "video-storage/")TEMP_SHARD_PATH = os.path.join(VIDEO_STORAGE_PATH, "temp/") # 临时分片目录HLS_STORAGE_PATH = os.path.join(VIDEO_STORAGE_PATH, "hls/") # HLS转码后存储目录# 上传配置MAX_SHARD_SIZE = 2 * 1024 * 1024 # 分片大小(2MB)ALLOWED_VIDEO_TYPES = ["video/mp4", "video/avi", "video/mkv", "video/mov", "video/webm"] # 允许上传的视频类型# 权限配置(示例:有效Token列表,生产环境需对接数据库)VALID_TOKENS = {"user_token_123", "admin_token_456"}# 创建目录(确保目录存在)for path in [VIDEO_STORAGE_PATH, TEMP_SHARD_PATH, HLS_STORAGE_PATH]:
if not os.path.exists(path):
os.makedirs(path, mode=0o755)4. 核心功能实现
(1)工具函数(utils/file_utils.py)
import osimport mimetypesfrom app.config import VIDEO_STORAGE_PATH, HLS_STORAGE_PATHdef get_video_mime_type(file_path: str) -> str:
"""获取文件MIME类型"""
mime_type, _ = mimetypes.guess_type(file_path)
if not mime_type:
if file_path.endswith(".m3u8"):
return "application/x-mpegURL"
elif file_path.endswith(".ts"):
return "video/MP2T"
else:
return "application/octet-stream"
return mime_typedef get_video_abs_path(file_name: str = None, hls_path: str = None) -> str:
"""获取视频文件绝对路径(支持普通视频和HLS文件)"""
if file_name:
return os.path.join(VIDEO_STORAGE_PATH, file_name)
elif hls_path:
return os.path.join(HLS_STORAGE_PATH, hls_path)
raise ValueError("必须提供file_name或hls_path")(2)权限验证(utils/auth_utils.py)
from fastapi import Header, HTTPExceptionfrom app.config import VALID_TOKENSdef verify_token(token: str = Header(None)): """验证播放权限(通过Token)""" if not token or token not in VALID_TOKENS: raise HTTPException(status_code=403, detail="无播放权限,请提供有效Token") return token
(3)视频上传服务(services/upload.py)
import osimport hashlibimport asynciofrom fastapi import UploadFile, HTTPExceptionfrom app.config import TEMP_SHARD_PATH, VIDEO_STORAGE_PATH, HLS_STORAGE_PATH, MAX_SHARD_SIZEfrom app.services.transcode import generate_hlsasync def upload_video_shard(
file: UploadFile,
file_name: str,
chunk_index: int,
total_chunks: int) -> dict:
"""分片上传视频,所有分片上传完成后合并并转码"""
# 验证文件类型
if file.content_type not in ["video/mp4", "video/avi", "video/mkv", "video/mov", "video/webm"]:
raise HTTPException(status_code=400, detail="不支持的视频格式")
# 生成唯一文件标识(避免文件名重复)
file_hash = hashlib.md5((file_name + str(os.path.getsize(file.file))).encode()).hexdigest()
shard_dir = os.path.join(TEMP_SHARD_PATH, file_hash + "/")
# 确保分片目录存在
if not os.path.exists(shard_dir):
os.makedirs(shard_dir, mode=0o755)
# 保存当前分片(异步写入文件)
shard_file_path = os.path.join(shard_dir, str(chunk_index))
try:
async with aiofiles.open(shard_file_path, "wb") as f:
content = await file.read()
await f.write(content)
except Exception as e:
raise HTTPException(status_code=500, detail=f"分片保存失败:{str(e)}")
# 检查是否所有分片上传完成
if chunk_index == total_chunks - 1:
# 合并分片
final_file_ext = file_name.split(".")[-1]
final_file_name = f"{file_hash}.{final_file_ext}"
final_file_path = os.path.join(VIDEO_STORAGE_PATH, final_file_name)
try:
# 按索引顺序合并所有分片
async with aiofiles.open(final_file_path, "wb") as final_f:
for i in range(total_chunks):
current_shard_path = os.path.join(shard_dir, str(i))
if not os.path.exists(current_shard_path):
raise HTTPException(status_code=500, detail=f"分片{i}缺失,合并失败")
async with aiofiles.open(current_shard_path, "rb") as shard_f:
await final_f.write(await shard_f.read())
# 删除临时分片
os.remove(current_shard_path)
# 删除临时分片目录
os.rmdir(shard_dir)
except Exception as e:
raise HTTPException(status_code=500, detail=f"分片合并失败:{str(e)}")
# 后台异步转码为HLS格式(不阻塞当前请求)
asyncio.create_task(generate_hls(final_file_path, os.path.join(HLS_STORAGE_PATH, file_hash + "/")))
# 返回播放地址
play_url = f"http://{os.getenv('SERVER_HOST', 'localhost')}:{os.getenv('SERVER_PORT', 8000)}/api/video/play?file_name={final_file_name}"
hls_play_url = f"http://{os.getenv('SERVER_HOST', 'localhost')}:{os.getenv('SERVER_PORT', 8000)}/api/video/play?hls_path={file_hash}/index.m3u8"
return {
"code": 200,
"msg": "上传完成,转码中(HLS格式稍后可用)",
"data": {
"play_url": play_url,
"hls_play_url": hls_play_url,
"file_name": final_file_name }
}
return {"code": 200, "msg": f"分片{chunk_index}上传成功", "data": {}}(4)视频转码服务(services/transcode.py)
import osimport subprocessfrom app.config import HLS_STORAGE_PATHasync def generate_hls(source_path: str, target_dir: str):
"""使用FFmpeg将视频转码为HLS格式(.m3u8 + ts分片)"""
# 确保目标目录存在
if not os.path.exists(target_dir):
os.makedirs(target_dir, mode=0o755)
# FFmpeg命令:10秒分片,支持自适应码率,无分片数量限制
cmd = [
"ffmpeg",
"-i", source_path,
"-profile:v", "baseline", # 兼容移动端
"-level", "3.0",
"-hls_time", "10", # 分片时长(10秒)
"-hls_list_size", "0", # 0表示无限制,显示所有分片
"-hls_segment_filename", os.path.join(target_dir, "segment_%03d.ts"), # ts分片命名格式
"-f", "hls",
os.path.join(target_dir, "index.m3u8") # 输出m3u8索引文件
]
try:
# 执行命令(后台运行,捕获输出)
subprocess.run(
cmd,
check=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
print(f"HLS转码成功:{target_dir}index.m3u8")
except subprocess.CalledProcessError as e:
print(f"HLS转码失败:{e.stderr}")
# 可选:删除失败的转码文件
if os.path.exists(target_dir):
for file in os.listdir(target_dir):
os.remove(os.path.join(target_dir, file))
os.rmdir(target_dir)(5)视频播放服务(services/play.py)
import osfrom fastapi import HTTPException, Response, statusfrom fastapi.responses import StreamingResponsefrom app.utils.file_utils import get_video_abs_path, get_video_mime_typeasync def stream_video(file_name: str = None, hls_path: str = None):
"""流式播放视频(支持Range请求,断点续传)"""
# 获取视频绝对路径
try:
video_path = get_video_abs_path(file_name, hls_path)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
# 检查文件是否存在
if not os.path.exists(video_path) or not os.path.isfile(video_path):
raise HTTPException(status_code=404, detail="视频文件不存在")
file_size = os.path.getsize(video_path)
mime_type = get_video_mime_type(video_path)
# 解析Range请求头(格式:bytes=0-1023 或 bytes=1024-)
range_header = os.getenv("HTTP_RANGE", "")
start = 0
end = file_size - 1
if range_header.startswith("bytes="):
range_str = range_header.split("=")[1]
range_parts = range_str.split("-")
start = int(range_parts[0]) if range_parts[0] else 0
end = int(range_parts[1]) if len(range_parts) > 1 and range_parts[1] else file_size - 1
# 修正范围(避免超出文件大小)
start = min(start, end)
end = min(end, file_size - 1)
# 计算响应长度
response_length = end - start + 1
# 构建响应头
headers = {
"Content-Type": mime_type,
"Content-Length": str(response_length),
"Content-Range": f"bytes {start}-{end}/{file_size}",
"Accept-Ranges": "bytes",
"Cache-Control": "public, max-age=86400" # 缓存1天
}
# 流式读取文件并返回
async def file_stream():
async with aiofiles.open(video_path, "rb") as f:
await f.seek(start)
remaining = response_length
chunk_size = 4096 # 4KB缓冲区
while remaining > 0:
read_size = min(chunk_size, remaining)
data = await f.read(read_size)
if not data:
break
yield data
remaining -= read_size
return StreamingResponse(
file_stream(),
status_code=status.HTTP_206_PARTIAL_CONTENT,
headers=headers )(6)主程序路由(main.py)
from fastapi import FastAPI, UploadFile, File, Form, Query, Dependsfrom fastapi.middleware.cors import CORSMiddlewarefrom app.config import HOST, PORTfrom app.services.upload import upload_video_shardfrom app.services.play import stream_videofrom app.utils.auth_utils import verify_token# 创建FastAPI应用app = FastAPI(title="Python 在线视频播放系统", version="1.0.0")# 允许跨域(开发环境,生产环境限制具体域名)app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],)# 健康检查接口@app.get("/health")async def health_check():
return {"status": "healthy", "message": "视频服务正常运行"}# 视频分片上传接口@app.post("/api/video/upload/shard", summary="分片上传视频")async def upload_shard(
file: UploadFile = File(...),
file_name: str = Form(...),
chunk_index: int = Form(...),
total_chunks: int = Form(...)):
return await upload_video_shard(file, file_name, chunk_index, total_chunks)# 视频播放接口(带权限验证)@app.get("/api/video/play", summary="流式播放视频(支持断点续传)")async def play_video(
file_name: str = Query(None, description="普通视频文件名(如xxx.mp4)"),
hls_path: str = Query(None, description="HLS文件路径(如xxx/index.m3u8)"),
token: str = Depends(verify_token)):
return await stream_video(file_name, hls_path)# 运行服务if __name__ == "__main__":
import uvicorn
uvicorn.run("app.main:app", host=HOST, port=PORT, reload=True) # reload=True仅用于开发环境三、前端实现(HTML + Video.js)
1. 视频上传页面(upload.html)
<!DOCTYPE html><html lang="zh-CN"><head>
<meta charset="UTF-8">
<title>视频上传</title>
<style>
.container { width: 80%; max-width: 800px; margin: 50px auto; }
.progress { width: 100%; height: 24px; border-radius: 12px; background: #eee; margin: 20px 0; overflow: hidden; }
.progress-bar { height: 100%; background: #2196F3; width: 0%; transition: width 0.3s ease; }
.info { margin: 10px 0; color: #666; }
button { padding: 10px 20px; background: #2196F3; color: white; border: none; border-radius: 4px; cursor: pointer; }
button:disabled { background: #ccc; cursor: not-allowed; }
</style></head><body>
<div class="container">
<h1>视频分片上传</h1>
<input type="file" id="videoFile" accept="video/*" onchange="handleFileSelect(event)" />
<div id="fileInfo" class="info" style="display: none;">
<p>文件名:<span id="fileName"></span></p>
<p>文件大小:<span id="fileSize"></span></p>
</div>
<button id="uploadBtn" disabled onclick="startUpload()">开始上传</button>
<div class="progress">
<div class="progress-bar" id="progressBar"></div>
</div>
<p>上传进度:<span id="progressText">0%</span></p>
<p id="statusText" class="info"></p>
</div>
<script>
const fileInput = document.getElementById("videoFile");
const fileInfo = document.getElementById("fileInfo");
const fileNameEl = document.getElementById("fileName");
const fileSizeEl = document.getElementById("fileSize");
const uploadBtn = document.getElementById("uploadBtn");
const progressBar = document.getElementById("progressBar");
const progressText = document.getElementById("progressText");
const statusText = document.getElementById("statusText");
let file = null;
const chunkSize = 2 * 1024 * 1024; // 2MB分片(与后端一致)
const serverUrl = "http://localhost:8000"; // 后端服务地址
// 选择文件后显示信息
function handleFileSelect(event) {
file = event.target.files[0];
if (!file) return;
fileInfo.style.display = "block";
fileNameEl.textContent = file.name;
fileSizeEl.textContent = formatFileSize(file.size);
uploadBtn.disabled = false;
progressBar.style.width = "0%";
progressText.textContent = "0%";
statusText.textContent = "";
}
// 格式化文件大小
function formatFileSize(size) {
if (size < 1024 * 1024) return (size / 1024).toFixed(2) + "KB";
if (size < 1024 * 1024 * 1024) return (size / (1024 * 1024)).toFixed(2) + "MB";
return (size / (1024 * 1024 * 1024)).toFixed(2) + "GB";
}
// 开始分片上传
async function startUpload() {
if (!file) return alert("请选择视频文件");
uploadBtn.disabled = true;
statusText.textContent = "正在上传...";
const totalSize = file.size;
const totalChunks = Math.ceil(totalSize / chunkSize);
const fileName = file.name;
try {
for (let chunkIndex = 0; chunkIndex < totalChunks; chunkIndex++) {
// 计算当前分片的起始和结束位置
const start = chunkIndex * chunkSize;
const end = Math.min(start + chunkSize, totalSize);
const chunk = file.slice(start, end);
// 构建表单数据
const formData = new FormData();
formData.append("file", chunk);
formData.append("file_name", fileName);
formData.append("chunk_index", chunkIndex);
formData.append("total_chunks", totalChunks);
// 上传单个分片
await uploadChunk(formData, chunkIndex, totalChunks);
}
statusText.textContent = "上传完成!HLS格式转码中,稍后可通过HLS地址播放";
} catch (error) {
statusText.textContent = `上传失败:${error.message}`;
uploadBtn.disabled = false;
}
}
// 上传单个分片(带进度监听)
function uploadChunk(formData, chunkIndex, totalChunks) {
return new Promise((resolve, reject) => {
const xhr = new XMLHttpRequest();
xhr.open("POST", `${serverUrl}/api/video/upload/shard`);
// 监听上传进度
xhr.upload.addEventListener("progress", (e) => {
if (e.lengthComputable) {
const chunkProgress = e.loaded / e.total;
const totalProgress = Math.floor((chunkIndex + chunkProgress) / totalChunks * 100);
progressBar.style.width = `${totalProgress}%`;
progressText.textContent = `${totalProgress}%`;
}
});
// 监听请求完成
xhr.addEventListener("load", () => {
if (xhr.status >= 200 && xhr.status < 300) {
const res = JSON.parse(xhr.responseText);
statusText.textContent = res.msg;
resolve(res);
} else {
const res = JSON.parse(xhr.responseText || "{}");
reject(new Error(res.msg || "分片上传失败"));
}
});
// 监听错误
xhr.addEventListener("error", () => {
reject(new Error("网络错误,分片上传失败"));
});
xhr.send(formData);
});
}
</script></body></html>2. 视频播放页面(play.html)
<!DOCTYPE html><html lang="zh-CN"><head>
<meta charset="UTF-8">
<title>视频播放</title>
<!-- 引入Video.js样式和脚本 -->
<link href="https://vjs.zencdn.net/8.6.1/video-js.css" rel="stylesheet">
<script src="https://vjs.zencdn.net/8.6.1/video.min.js"></script>
<!-- 引入HLS插件(支持.m3u8格式) -->
<script src="https://cdn.jsdelivr.net/npm/videojs-contrib-hls@5.15.0/dist/videojs-contrib-hls.min.js"></script>
<style>
.container { width: 90%; max-width: 1200px; margin: 50px auto; }
.video-js { width: 100%; height: auto; }
</style></head><body>
<div class="container">
<h1>视频播放</h1>
<!-- Video.js播放器容器 -->
<video
id="videoPlayer"
class="video-js vjs-big-play-centered vjs-fluid"
controls
preload="auto"
poster="https://via.placeholder.com/1200x675?text=视频封面"
data-setup='{"responsive": true, "fluid": true}'
></video>
</div>
<script>
// 后端服务地址和播放配置
const serverUrl = "http://localhost:8000";
const token = "user_token_123"; // 有效Token(与后端VALID_TOKENS一致)
const videoType = "hls"; // 可选:mp4/hls(根据实际视频格式选择)
// 初始化播放器
const player = videojs("videoPlayer", {
autoplay: false, // 禁止自动播放(浏览器政策限制)
controls: true, // 显示控制栏
playbackRates: [0.5, 1.0, 1.5, 2.0], // 倍速播放选项
muted: false, // 默认不静音
responsive: true,
fluid: true
});
// 构建播放地址(带Token参数)
let videoUrl = "";
if (videoType === "mp4") {
// 普通MP4格式(替换为实际文件名)
const fileName = "xxx.mp4";
videoUrl = `${serverUrl}/api/video/play?file_name=${fileName}&token=${token}`;
player.src({ src: videoUrl, type: "video/mp4" });
} else if (videoType === "hls") {
// HLS格式(替换为实际HLS路径)
const hlsPath = "xxx/index.m3u8";
videoUrl = `${serverUrl}/api/video/play?hls_path=${hlsPath}&token=${token}`;
player.src({ src: videoUrl, type: "application/x-mpegURL" });
}
// 错误处理
player.on("error", () => {
const error = player.error();
console.error("播放失败:", error);
alert(`视频播放失败:${error.message || "未知错误"}`);
});
// 播放状态监听
player.on("play", () => {
console.log("开始播放");
});
player.on("pause", () => {
console.log("暂停播放");
});
player.on("ended", () => {
console.log("播放完成");
alert("视频播放完成!");
});
</script></body></html>四、生产环境优化与部署
1. 性能优化
存储优化:
放弃本地存储,使用阿里云 OSS / 腾讯云 COS,配置 CDN 加速(如阿里云 CDN、Cloudflare),降低源站压力,提升跨地域访问速度。
视频文件上传直接对接云存储 SDK(如
aliyun-oss-python-sdk),避免后端服务器中转,节省带宽和存储成本。转码优化:
多码率转码:通过 FFmpeg 生成 720p、1080p 等多码率的 HLS 文件,支持弱网环境下自动切换码率。
异步任务队列:使用 Celery+Redis 替代
asyncio.create_task,处理大量视频转码任务,支持任务重试、优先级排序。缓存优化:
视频元信息(大小、格式、播放地址)缓存到 Redis,减少文件系统查询。
配置 CDN 缓存策略,对视频分片和 HLS 索引文件设置较长缓存时间(如 7 天)。
并发优化:
生产环境使用
uvicorn搭配gunicorn作为 WSGI 服务器,配置多进程多线程:bash运行gunicorn -w 4 -k uvicorn.workers.UvicornWorker app.main:app --bind 0.0.0.0:8000
启用 FastAPI 的
limit_concurrency限制最大并发数,避免服务器过载。
2. 部署注意事项
Python 环境配置:
使用虚拟环境(
venv/conda)隔离依赖,避免版本冲突。安装生产环境依赖时添加
--no-dev参数,排除开发工具。服务器配置:
调整系统文件描述符限制(
ulimit -n 65535),支持更多并发连接。配置 Nginx 反向代理 FastAPI 服务,处理静态资源和负载均衡:
nginxserver { listen 80; server_name your-domain.com; # 反向代理API请求 location /api/ { proxy_pass http://127.0.0.1:8000; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; } # 静态资源(如前端页面) location / { root /www/python-video-player/frontend; index index.html; }}安全配置:
播放接口 Token 改为 JWT(
pyjwt),支持过期时间和刷新机制,避免固定 Token 泄露。视频存储目录设置为非 Web 可访问路径,通过后端接口间接访问,防止文件遍历攻击。
启用 HTTPS(Let's Encrypt 免费证书),加密传输,避免视频内容被篡改或盗链。
限制上传文件大小和类型,防止恶意文件上传。
3. 常见问题排查
视频无法播放:
检查 Token 是否有效,JWT 是否过期。
检查视频格式和 MIME 类型是否匹配(如 HLS 格式需返回
application/x-mpegURL)。检查跨域配置是否正确,前端和后端是否都允许对应的域名和请求头。
分片上传失败:
检查服务器磁盘空间或云存储权限是否充足。
确认分片大小与后端一致,避免分片索引错乱。
大文件上传建议使用 HTTPS,避免网络不稳定导致的分片丢失。
HLS 转码失败:
确认 FFmpeg 已安装且在系统环境变量中(
ffmpeg -version验证)。检查视频文件是否损坏,尝试用本地播放器打开测试。
查看转码日志,排查 FFmpeg 命令参数错误或资源不足(CPU / 内存)。