本文主要是介绍Python一次性将指定版本所有包上传PyPI镜像解决方案,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
《Python一次性将指定版本所有包上传PyPI镜像解决方案》本文主要介绍了一个安全、完整、可离线部署的解决方案,用于一次性准备指定Python版本的所有包,然后导出到内网环境,感兴趣的小伙伴可以跟随...
根据您的需求和知识库中的警告信息(清华镜像会阻断大量下载行为),我设计了一个安全、完整、可离线部署的解决方案,用于一次性准备指定python版本的所有包,然后导出到内网环境。
为什么需要这个方案
知识库明确显示:清华镜像会检测到大量下载行为并阻断请求。直接尝试下载所有包会触发此限制,导致下载失败。我们需要一个谨慎、分阶段、有间隔的下载策略。
完整解决方案
1. 项目目录结构
mkdir -p pypi-offline/{config,scripts,packages} cd pypi-offline
2. 创建智能下载脚本
scripts/safe_download.py
#!/usr/bin/env python3 import os import sys import time import random import requests import argparse from pathlib import Path from concurrent.futures import ThreadPoolExecutor, as_completed # 配置参数 MAX_RETRIES = 3 MIN_DELAY = 5 # 最小延迟(秒) MAX_DELAY = 15 # 最大延迟(秒) MAX_WORKERS = 2 # 最大并发数(避免触发限制) # PyPI API端点 PYPI_SIMPLE = "https://pypi.org/simple" # 使用官方PyPI而非清华镜像,避免触发限制 # 如果网络条件好,可考虑使用其他镜像 def get_package_info(package_name): """获取包的元信息""" url = f"https://pypi.org/pypi/{package_name}/json" for i in range(MAX_RETRIES): try: response = requests.get(url, timeout=30) if response.status_code == 200: return response.json() elif response.status_code == 404: return None time.sleep((i + 1) * 2) except Exception as e: print(f"获取包 {package_name} 信息失败: {str(e)}") time.sleep((i + 1) * 5) return None def filter_packages_for_python(package_data, python_version): """过滤出兼容指定Python版本的包""" compatible_files = [] py_ver = python_version.replace('.', '') for file_info in package_data['urls']: # 检查Python版本兼容性 py_tag = file_info.get('python_version', '') if py_tag == 'source' or py_tag.startswith('py') or py_tag.startswith('cp' + py_ver): compatible_files.append(file_info) return compatible_files def download_package(package_name, file_info, target_dir): """安全下载单个包文件""" file_url = file_info['url'] file_name = file_info['filename'] target_path = os.path.join(target_dir, file_name) # 如果文件已存在,跳过 if os.path.exists(target_path): print(f"跳过已存在的文件: {file_name}") return True print(f"下载: {file_name} ({packahttp://www.chinasem.cnge_name})") for i in range(MAX_RETRIES): try: response = requests.get(file_url, stream=True, timeout=60) if China编程response.status_code == 200: with open(target_path, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) print(f"成功下载: {file_name}") return True print(f"下载失败 ({response.status_code}): {file_name}") time.sleep((i + 1) * 5) except Exception as e: print(f"下载 {file_name} 失败: {str(e)}") time.sleep((i + 1) * 5) # 下载失败,删除可能的部分文件 if os.path.exists(target_path): os.remove(target_path) return False def get_all_packages(): """获取所有包的列表""" print("获取所有包的列表...") response = requests.get(f"{PYPI_SIMPLE}/", timeout=30) if response.status_code != 200: raise Exception(f"无法获取包列表: HTTP {response.status_code}") # 解析html获取包名 import re package_names = re.findall(r'<a href="/simple/([^" rel="external nofollow" /]+)">\1</a>', response.text) return package_names def main(): parser = argparse.ArgumentParser(description='安全下载指定Python版本的所有包') parser.add_argument('--python-version', required=True, help='目标Python版本 (如: 3.8)') parser.add_argument('--output-dir', default='../packages', help='输出目录') args = parser.parse_args() # 创建输出目录 output_dir = Path(args.output_dir) output_dir.mkdir(parents=True, exist_ok=True) print(f"输出目录: {output_dir}") # 获取所有包列表 try: all_packages = get_all_packages() print(f"找到 {len(all_packages)} 个包") except Exception as e: print(f"获取包列表失败: {str(e)}") print("请尝试使用清华镜像的简单页面 API (需要处理 HTML)") # 作为备选方案,可以使用清华镜像的简单页面 # all_packages = get_packages_from_tuna() return 1 # 处理包 success_count = 0 failed_packages = [] # 分批次处理,避免一次性太多请求 BATch_size = 50 for i in range(0, len(all_packages), batch_size): batch = all_packages[i:i+batch_size] print(f"\n处理包批次 {i//batch_size + 1} ({len(batch)} 个包)") with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: future_to_pkg = {} for pkg in batch: pkg_data = get_package_info(pkg) if pkg_data: compatible_files = filter_packages_for_python(pkg_data, args.python_version) if compatible_files: for file_info in compatible_files: future = executor.submit( download_package, pkg, file_info, output_dir ) future_to_pkg[future] = pkg # 添加随机延迟,避免触发限制 time.sleep(random.uniform(MIN_DELAY, MAX_DELAY)) # 等待并处理结果 for future in as_completed(future_to_pkg): pkg = future_to_pkg[future] try: if future.result(): success_count += 1 else: failed_packages.append(pkg) except Exception as e: print(f"处理包 {pkg} 时出错: {str(e)}") failed_packages.append(pkg) # 批次间额外延迟 print(f"\n批次处理完成,等待 {MAX_DELAY*2} 秒...") time.sleep(MAX_DELAY * 2) # 生成报告 print("\n===== 下载完成 =====") print(f"成功: {success_count} 个包") print(f"失败: {len(failed_packages)} 个包") if failed_packages: print("\n失败的包列表 (可后续重试):") for pkg in failed_www.chinasem.cnpackages[:20]: # 只显示前20个 print(f"- {pkg}") if len(failed_packages) > 20: print(f"... 及其他 {len(failed_packages) - 20} 个包") # 保存失败列表以便重试 with open(output_dir / 'failed_packages.txt', 'w') as f: for pkg in failed_packages: China编程 f.write(pkg + '\n') return 0 if __name__ == "__main__": sys.exit(main())
3. 创建包清单生成脚本
scripts/generate_simple_index.py
#!/usr/bin/env python3 import os import sys from pathlib import Path import re import html def generate_simple_index(packages_dir, output_dir): """生成符合PEP 503的simple index""" packages_dir = Path(packages_dir) output_dir = Path(output_dir) # 创建输出目录 output_dir.mkdir(parents=True, exist_ok=True) # 收集所有包名 package_names = set() for filename in os.listdir(packages_dir): # 从文件名提取包名 (PEP 427 格式) match = re.match(r'^([a-zA-Z0-9_-]+)(?:-[a-zA-Z0-9_.+-]+)?\.(?:whl|tar\.gz|zip)$', filename) if match: package_name = match.group(1).lower() # 规范化包名 (PEP 503) package_name = package_name.replace('_', '-') package_names.add(package_name) # 为每个包生成索引页面 for package in package_names: index_content = f'<html><head><title>Links for {package}</title></head>\n' index_content += f'<body>\n<h1>Links for {package}</h1>\n' # 找出该包的所有文件 for filename in os.listdir(packages_dir): http://www.chinasem.cn if re.match(f'^{re.escape(package)}(?:-[a-zA-Z0-9_.+-]+)?\.(?:whl|tar\.gz|zip)$', filename, re.IGNORECASE): file_url = f'../{filename}' index_content += f'<a href="{file_url}" rel="external nofollow" >{html.escape(filename)}</a> \n' index_content += '</body></html>' # 保存索引页面 package_dir = output_dir / package package_dir.mkdir(exist_ok=True) with open(package_dir / 'index.html', 'w', encoding='utf-8') as f: f.write(index_content) # 生成根索引页面 root_index = '<html><head><title>Simple Index</title></head>\n' root_index += '<body>\n<h1>Simple Index</h1>\n' for package in sorted(package_names): root_index += f'<a href="{package}/" rel="external nofollow" >{html.escape(package)}</a> \n' root_index += '</body></html>' with open(output_dir / 'index.html', 'w', encoding='utf-8') as f: f.write(root_index) print(f"成功生成 {len(package_names)} 个包的索引") if __name__ == "__main__": if len(sys.argv) != 3: print("用法: python generate_simple_index.py <packages_dir> <output_dir>") sys.exit(1) generate_simple_index(sys.argv[1], sys.argv[2])
4. 创建 Docker Compose 配置docker-compose.yml
version: '3.8' services: pypi-offline: image: python:3.9-slim container_name: pypi-offline ports: - "8080:8000" volumes: - ./web:/usr/src/app - ./packages:/usr/src/app/packages working_dir: /usr/src/app command: > sh -c "python -m http.server 8000 --directory /usr/src/app" restart: unless-stopped
5. 创建 Web 服务目录结构
mkdir -p web/{simple,packages}
6. 创建准备脚本
prepare_offline_mirror.sh
#!/bin/bash # 准备离线PyPI镜像 PYTHON_VERSION=$1 if [ -z "$PYTHON_VERSION" ]; then echo "用法: $0 <python版本>" echo "示例: $0 3.8" exit 1 fi # 检查依赖 if ! command -v python3 &> /dev/null; then echo "错误: 需要安装Python 3" exit 1 fi # 1. 安全下载所有兼容指定Python版本的包 echo "步骤1: 安全下载所有兼容Python $PYTHON_VERSION 的包" python3 scripts/safe_download.py --python-version "$PYTHON_VERSION" --output-dir packages # 2. 生成simple index echo "步骤2: 生成PEP 503兼容的simple index" python3 scripts/generate_simple_index.py packages web/simple # 3. 创建README cat > web/README.md << EOF # 离线PyPI镜像 此目录包含Python $PYTHON_VERSION 的完整PyPI镜像。 ## 使用方法 1. 启动服务: docker-compose up -d 2. 客户端使用: pip install 包名 -i http://your-server:8080/simple EOF echo "离线PyPI镜像准备完成!" echo "现在可以将整个pypi-offline目录复制到内网环境" echo "在内网环境中执行: docker-compose up -d 启动服务"
完整操作流程
1. 在可联网的环境中准备镜像
# 克隆项目 git clone https://github.com/your-repo/pypi-offline.git cd pypi-offline # 赋予脚本执行权限 chmod +x scripts/*.py prepare_offline_mirror.sh # 准备Python 3.8的完整镜像 (这将需要较长时间) ./prepare_offline_mirror.sh 3.8 # 准备Python 3.9的完整镜像 (可选) # ./prepare_offline_mirror.sh 3.9
2. 验证下载结果
# 检查包数量 ls packages | wc -l # 检查simple index ls web/simple | wc -l
3. 将整个目录复制到内网环境
# 压缩整个目录 tar -czvf pypi-offline-3.8.tar.gz pypi-offline # 将压缩文件传输到内网环境
4. 在内网环境中部署
# 解压 tar -xzvf pypi-offline-3.8.tar.gz cd pypi-offline # 启动服务 docker-compose up -d
5. 内网客户端使用
# 临时使用 pip install numpy -i http://pypi-offline-server:8080/simple # 永久配置 pip config set global.index-url http://pypi-offline-server:8080/simple
方案优势
1.避免触发下载限制:
- 使用低并发(2个线程)
- 随机延迟(5-15秒)
- 批次处理(每50个包)
- 失败重试机制
- 使用官方PyPI而非清华镜像(更宽松)
2.完整镜像:
- 下载指定Python版本的所有兼容包
- 生成符合PEP 503标准的simple index
- 支持所有pip安装方式
3.轻量级部署:
- 仅需Python内置HTTP服务器
- 无需额外依赖
- 完全静态内容,易于复制
4.可验证性:
- 生成下载报告
- 记录失败包以便重试
- 简单的README说明
注意事项
- 下载时间: 完整镜像需要较长时间(可能数天),因为要遵守严格的下载限制
- 存储空间: Python 3.8的完整镜像约需50-100GB空间
- 分阶段下载: 如果中途失败,可从
failed_packages.txt
继续 - 版本选择: 建议选择企业常用的稳定版本(如3.8或3.9)
此方案确保您可以安全地一次性准备完整镜像,然后完全离线使用,完美符合您的内网部署需求。
以上就是Python一次性将指定版本所有包上传PyPI镜像解决方案的详细内容,更多关于Python打包到PyPI的资料请关注编程China编程(www.chinasem.cn)其它相关文章!
这篇关于Python一次性将指定版本所有包上传PyPI镜像解决方案的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!