本文主要是介绍Python FastAPI+Celery+RabbitMQ实现分布式图片水印处理系统,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
《PythonFastAPI+Celery+RabbitMQ实现分布式图片水印处理系统》这篇文章主要为大家详细介绍了PythonFastAPI如何结合Celery以及RabbitMQ实现简单的分布式...
实现思路
- FastAPI 服务器
- Celery 任务队列
- RabbitMQ 作为消息代理
- 定时任务处理
完整步骤
首先创建项目结构:
c:\Users\Administrator\Desktop\meitu\
├── app/
│ ├── __init__.py
│ ├── main.py
│ ├── celery_app.py
│ ├── tasks.py
│ └── config.py
├── requirements.txt
└── celery_worker.py
1.首先创建 requirements.txt:
2.创建配置文件:
from dotenv import load_dotenv
import os
load_dotenv()
# RabbitMQ配置
RABBITMQ_HOST = os.getenv("RABBITMQ_HOST", "localhost")
RABBITMQ_PORT = os.getenv("RABBITMQ_PORT", "5672")
RABBITMQ_USER = os.getenv("RABBITMQ_USER", "guest")
RABBITMQ_PASS = os.getenv("RABBITMQ_PASS", "guest")
# Celery配置
CELERY_BROKER_URL = f"amqp://{RABBITMQ_USER}:{RABBITMQ_PASS}@{RABBITMQ_HOST}:{RABBITMQ_PORT}//"
CELERY_RESULT_BACKEND = "rpc://"
# 定时任务配置
CELERY_BEAT_SCHEDULE = {
'process-images-every-hour': {
'task': 'app.tasks.process_images',
'schedule': 3600.0, # 每小时执行一次
},
'daily-cleanup': {
'task': 'app.tasks.cleanup_old_images',
'schedule': 86400.0, # 每天执行一次
}
}
3.创建 Celery 应用:
from celery import Celery from app.configphp import CELERY_BROKER_URL, CELERY_RESULT_BACKEND, CELERY_BEAT_SCHEDULE celery_app = Celery( 'image_processing', broker=CELERY_BROKER_URL, backend=CELERY_RESULT_BACKEND, include=['app.tasks'] ) # 配置定时任务 celery_app.conf.beat_schedule = CELERY_BEAT_SCHEDULE celery_app.conf.timezone = 'Asia/Shanghai'
4.创建任务文件:
from app.celery_app import celery_app
from app.watermark import ImageWatermarker
import os
from datetime import datetime, timedelta
@celery_app.task
def add_watermark_task(image_path, text, position='center'):
"""异步添加水印任务"""
watermarker = ImageWatermarker()
try:
result_path = watermarker.add_watermark(
image_path=image_path,
text=text,
www.chinasem.cn position=position
)
return {"status": "success", "output_path": result_path}
except Exception as e:
return {"status": "error", "message": str(e)}
@celery_app.task
def process_images():
"""定时处理图片任务"""
image_dir = "images/pending"
if not os.path.exists(image_dir):
return {"status": "error", "message": "Pending directory not found"}
processed = 0
for image in os.listdir(image_dir):
if image.lower().endswith(('.png', '.jpg', '.jpeg')):
add_watermark_task.delay(
os.path.join(image_dir, image),
"自动处理水印",
'center'
)
processed += 1
return {"status": "success", "processed": processed}
@celery_app.task
def cleanup_old_images():
"""清理旧图片任务"""
output_dir = "images/processed"
if not os.path.exists(output_dir):
return {"status": "error", "message": "Output directory not found"}
threshold_date = datetime.now() - timedelta(days=7)
cleaned = 0
for image in os.listdir(output_dir):
image_path = os.path.join(output_dir, image)
if os.path.getctime(image_path) < threshold_date.timestamp():
os.remove(image_path)
cleaned += 1
return {"status": "success", "cleaned": cleaned}
5.创建 FastAPI 应用:
from fastapi import FastAPI, File, UploadFile, BackgroundTasks from fastapi.responses import jsONResponse import os from app.tasks import add_watermark_task from app.celery_app import celery_app app = FastAPI(title="图片水印处理服务") @aphppp.post("/upload/") async def upload_image( file: UploadFile = File(...), text: str = "水印文本", position: str = "center" ): # 保存上传的文件 file_path = f"images/uploads/{file.filename}" os.makedirs(os.path.dirname(file_path), exist_ok=True) with open(file_path, "wb") as buffer: content = await file.read() buffer.write(content) # 创建异步任务 task = add_watermark_task.delay(file_path, text, position) return JSONResponse({ "status": "success", "message": "图片已上传并加入处理队列", "task_id": task.id }) @app.get("/task/{task_id}") async def get_task_status(task_id: str): task = celery_app.AsyncResult(task_id) if task.ready(): return {"status": "completed", "result": task.result} return {"status": "processing"} @app.get("/tasks/scheduled") async def get_scheduledChina编程_tasks(): return {"tasks": celery_app.conf.beat_schedule}
6.创建 Celery worker 启动文件:
from app.celery_app import celery_app
if __name__ == '__main__':
celery_app.start()
使用说明
首先安装依赖:
pip install -r requirements.txt
确保 RabbitMQ 服务已启动
启动 FastAPI 服务器:
uvicorn app.main:app --reload
启动 Celery worker:
celery -A celery_worker.celery_app worker --loglevel=info
启动 Celery beat(定时任务):
celery -A celery_worker.celery_app beat --loglevel=info
这个系统提供以下功能:
- 通过 FastAPI 接口上传图片并异步处理水印
- 使用 Celery 处理异步任务队列
- 使用 RabbitMQ 作为消息代理
- 支持定时任务:
- 每小时自动处理待处理图片
- 每天清理一周前的旧图片
- 支持任务状态查询
- 支持查看计划任务列表
API 端点:
- POST /upload/ - 上传图片并创建水印任务
- GET /task/{task_id} - 查询任务状态
- GET /tasks/scheduled - 查看计划任务列表
以上就是Python FastAPI+Celery+RabbitMQ实现分布式图片水印处理系统的详细内容,更多关于Python图片水印的资料请关注China编程(www.chinasem.cn)其它相关文章!
这篇关于Python FastAPI+Celery+RabbitMQ实现分布式图片水印处理系统的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!