Python一次性将指定版本所有包上传PyPI镜像解决方案

本文主要是介绍Python一次性将指定版本所有包上传PyPI镜像解决方案,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

《Python一次性将指定版本所有包上传PyPI镜像解决方案》本文主要介绍了一个安全、完整、可离线部署的解决方案,用于一次性准备指定Python版本的所有包,然后导出到内网环境,感兴趣的小伙伴可以跟随...

根据您的需求和知识库中的警告信息(清华镜像会阻断大量下载行为),我设计了一个安全、完整、可离线部署的解决方案,用于一次性准备指定python版本的所有包,然后导出到内网环境。

为什么需要这个方案

知识库明确显示:清华镜像会检测到大量下载行为并阻断请求。直接尝试下载所有包会触发此限制,导致下载失败。我们需要一个谨慎、分阶段、有间隔的下载策略。

完整解决方案

1. 项目目录结构

mkdir -p pypi-offline/{config,scripts,packages}
cd pypi-offline

2. 创建智能下载脚本

scripts/safe_download.py

#!/usr/bin/env python3
import os
import sys
import time
import random
import requests
import argparse
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed

# 配置参数
MAX_RETRIES = 3
MIN_DELAY = 5  # 最小延迟(秒)
MAX_DELAY = 15  # 最大延迟(秒)
MAX_WORKERS = 2  # 最大并发数(避免触发限制)

# PyPI API端点
PYPI_SIMPLE = "https://pypi.org/simple"
# 使用官方PyPI而非清华镜像,避免触发限制
# 如果网络条件好,可考虑使用其他镜像

def get_package_info(package_name):
    """获取包的元信息"""
    url = f"https://pypi.org/pypi/{package_name}/json"
    for i in range(MAX_RETRIES):
        try:
            response = requests.get(url, timeout=30)
            if response.status_code == 200:
                return response.json()
            elif response.status_code == 404:
                return None
            time.sleep((i + 1) * 2)
        except Exception as e:
            print(f"获取包 {package_name} 信息失败: {str(e)}")
            time.sleep((i + 1) * 5)
    return None

def filter_packages_for_python(package_data, python_version):
    """过滤出兼容指定Python版本的包"""
    compatible_files = []
    py_ver = python_version.replace('.', '')
    
    for file_info in package_data['urls']:
        # 检查Python版本兼容性
        py_tag = file_info.get('python_version', '')
        if py_tag == 'source' or py_tag.startswith('py') or py_tag.startswith('cp' + py_ver):
            compatible_files.append(file_info)
    
    return compatible_files

def download_package(package_name, file_info, target_dir):
    """安全下载单个包文件"""
    file_url = file_info['url']
    file_name = file_info['filename']
    target_path = os.path.join(target_dir, file_name)
    
    # 如果文件已存在,跳过
    if os.path.exists(target_path):
        print(f"跳过已存在的文件: {file_name}")
        return True
    
    print(f"下载: {file_name} ({packahttp://www.chinasem.cnge_name})")
    
    for i in range(MAX_RETRIES):
        try:
            response = requests.get(file_url, stream=True, timeout=60)
            if China编程response.status_code == 200:
                with open(target_path, 'wb') as f:
                    for chunk in response.iter_content(chunk_size=8192):
                        f.write(chunk)
                print(f"成功下载: {file_name}")
                return True
            print(f"下载失败 ({response.status_code}): {file_name}")
            time.sleep((i + 1) * 5)
        except Exception as e:
            print(f"下载 {file_name} 失败: {str(e)}")
            time.sleep((i + 1) * 5)
    
    # 下载失败,删除可能的部分文件
    if os.path.exists(target_path):
        os.remove(target_path)
    return False

def get_all_packages():
    """获取所有包的列表"""
    print("获取所有包的列表...")
    response = requests.get(f"{PYPI_SIMPLE}/", timeout=30)
    if response.status_code != 200:
        raise Exception(f"无法获取包列表: HTTP {response.status_code}")
    
    # 解析html获取包名
    import re
    package_names = re.findall(r'<a href="/simple/([^" rel="external nofollow" /]+)">\1</a>', response.text)
    return package_names

def main():
    parser = argparse.ArgumentParser(description='安全下载指定Python版本的所有包')
    parser.add_argument('--python-version', required=True, help='目标Python版本 (如: 3.8)')
    parser.add_argument('--output-dir', default='../packages', help='输出目录')
    args = parser.parse_args()
    
    # 创建输出目录
    output_dir = Path(args.output_dir)
    output_dir.mkdir(parents=True, exist_ok=True)
    print(f"输出目录: {output_dir}")
    
    # 获取所有包列表
    try:
        all_packages = get_all_packages()
        print(f"找到 {len(all_packages)} 个包")
    except Exception as e:
        print(f"获取包列表失败: {str(e)}")
        print("请尝试使用清华镜像的简单页面 API (需要处理 HTML)")
        # 作为备选方案,可以使用清华镜像的简单页面
        # all_packages = get_packages_from_tuna()
        return 1
    
    # 处理包
    success_count = 0
    failed_packages = []
    
    # 分批次处理,避免一次性太多请求
    BATch_size = 50
    for i in range(0, len(all_packages), batch_size):
        batch = all_packages[i:i+batch_size]
        print(f"\n处理包批次 {i//batch_size + 1} ({len(batch)} 个包)")
        
        with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
            future_to_pkg = {}
            
            for pkg in batch:
                pkg_data = get_package_info(pkg)
                if pkg_data:
                    compatible_files = filter_packages_for_python(pkg_data, args.python_version)
                    if compatible_files:
                        for file_info in compatible_files:
                            future = executor.submit(
                                download_package, 
                                pkg, 
                                file_info, 
                                output_dir
                            )
                            future_to_pkg[future] = pkg
                # 添加随机延迟,避免触发限制
                time.sleep(random.uniform(MIN_DELAY, MAX_DELAY))
            
            # 等待并处理结果
            for future in as_completed(future_to_pkg):
                pkg = future_to_pkg[future]
                try:
                    if future.result():
                        success_count += 1
                    else:
                        failed_packages.append(pkg)
                except Exception as e:
                    print(f"处理包 {pkg} 时出错: {str(e)}")
                    failed_packages.append(pkg)
        
        # 批次间额外延迟
        print(f"\n批次处理完成,等待 {MAX_DELAY*2} 秒...")
        time.sleep(MAX_DELAY * 2)
    
    # 生成报告
    print("\n===== 下载完成 =====")
    print(f"成功: {success_count} 个包")
    print(f"失败: {len(failed_packages)} 个包")
    
    if failed_packages:
        print("\n失败的包列表 (可后续重试):")
        for pkg in failed_www.chinasem.cnpackages[:20]:  # 只显示前20个
            print(f"- {pkg}")
        if len(failed_packages) > 20:
            print(f"... 及其他 {len(failed_packages) - 20} 个包")
        
        # 保存失败列表以便重试
        with open(output_dir / 'failed_packages.txt', 'w') as f:
            for pkg in failed_packages:
  China编程              f.write(pkg + '\n')
    
    return 0

if __name__ == "__main__":
    sys.exit(main())

3. 创建包清单生成脚本

scripts/generate_simple_index.py

#!/usr/bin/env python3
import os
import sys
from pathlib import Path
import re
import html

def generate_simple_index(packages_dir, output_dir):
    """生成符合PEP 503的simple index"""
    packages_dir = Path(packages_dir)
    output_dir = Path(output_dir)
    
    # 创建输出目录
    output_dir.mkdir(parents=True, exist_ok=True)
    
    # 收集所有包名
    package_names = set()
    for filename in os.listdir(packages_dir):
        # 从文件名提取包名 (PEP 427 格式)
        match = re.match(r'^([a-zA-Z0-9_-]+)(?:-[a-zA-Z0-9_.+-]+)?\.(?:whl|tar\.gz|zip)$', filename)
        if match:
            package_name = match.group(1).lower()
            # 规范化包名 (PEP 503)
            package_name = package_name.replace('_', '-')
            package_names.add(package_name)
    
    # 为每个包生成索引页面
    for package in package_names:
        index_content = f'<html><head><title>Links for {package}</title></head>\n'
        index_content += f'<body>\n<h1>Links for {package}</h1>\n'
        
        # 找出该包的所有文件
        for filename in os.listdir(packages_dir):
     http://www.chinasem.cn       if re.match(f'^{re.escape(package)}(?:-[a-zA-Z0-9_.+-]+)?\.(?:whl|tar\.gz|zip)$', filename, re.IGNORECASE):
                file_url = f'../{filename}'
                index_content += f'<a href="{file_url}" rel="external nofollow" >{html.escape(filename)}</a>
\n'
        
        index_content += '</body></html>'
        
        # 保存索引页面
        package_dir = output_dir / package
        package_dir.mkdir(exist_ok=True)
        with open(package_dir / 'index.html', 'w', encoding='utf-8') as f:
            f.write(index_content)
    
    # 生成根索引页面
    root_index = '<html><head><title>Simple Index</title></head>\n'
    root_index += '<body>\n<h1>Simple Index</h1>\n'
    
    for package in sorted(package_names):
        root_index += f'<a href="{package}/" rel="external nofollow" >{html.escape(package)}</a>
\n'
    
    root_index += '</body></html>'
    
    with open(output_dir / 'index.html', 'w', encoding='utf-8') as f:
        f.write(root_index)
    
    print(f"成功生成 {len(package_names)} 个包的索引")

if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("用法: python generate_simple_index.py <packages_dir> <output_dir>")
        sys.exit(1)
    
    generate_simple_index(sys.argv[1], sys.argv[2])

4. 创建 Docker Compose 配置docker-compose.yml

version: '3.8'

services:
  pypi-offline:
    image: python:3.9-slim
    container_name: pypi-offline
    ports:
      - "8080:8000"
    volumes:
      - ./web:/usr/src/app
      - ./packages:/usr/src/app/packages
    working_dir: /usr/src/app
    command: >
      sh -c "python -m http.server 8000 --directory /usr/src/app"
    restart: unless-stopped

5. 创建 Web 服务目录结构

mkdir -p web/{simple,packages}

6. 创建准备脚本

prepare_offline_mirror.sh

#!/bin/bash
# 准备离线PyPI镜像

PYTHON_VERSION=$1
if [ -z "$PYTHON_VERSION" ]; then
    echo "用法: $0 <python版本>"
    echo "示例: $0 3.8"
    exit 1
fi

# 检查依赖
if ! command -v python3 &> /dev/null; then
    echo "错误: 需要安装Python 3"
    exit 1
fi

# 1. 安全下载所有兼容指定Python版本的包
echo "步骤1: 安全下载所有兼容Python $PYTHON_VERSION 的包"
python3 scripts/safe_download.py --python-version "$PYTHON_VERSION" --output-dir packages

# 2. 生成simple index
echo "步骤2: 生成PEP 503兼容的simple index"
python3 scripts/generate_simple_index.py packages web/simple

# 3. 创建README
cat > web/README.md << EOF
# 离线PyPI镜像

此目录包含Python $PYTHON_VERSION 的完整PyPI镜像。

## 使用方法

1. 启动服务:
   docker-compose up -d

2. 客户端使用:
   pip install 包名 -i http://your-server:8080/simple
EOF

echo "离线PyPI镜像准备完成!"
echo "现在可以将整个pypi-offline目录复制到内网环境"
echo "在内网环境中执行: docker-compose up -d 启动服务"

完整操作流程

1. 在可联网的环境中准备镜像

# 克隆项目
git clone https://github.com/your-repo/pypi-offline.git
cd pypi-offline

# 赋予脚本执行权限
chmod +x scripts/*.py prepare_offline_mirror.sh

# 准备Python 3.8的完整镜像 (这将需要较长时间)
./prepare_offline_mirror.sh 3.8

# 准备Python 3.9的完整镜像 (可选)
# ./prepare_offline_mirror.sh 3.9

2. 验证下载结果

# 检查包数量
ls packages | wc -l

# 检查simple index
ls web/simple | wc -l

3. 将整个目录复制到内网环境

# 压缩整个目录
tar -czvf pypi-offline-3.8.tar.gz pypi-offline

# 将压缩文件传输到内网环境

4. 在内网环境中部署

# 解压
tar -xzvf pypi-offline-3.8.tar.gz
cd pypi-offline

# 启动服务
docker-compose up -d

5. 内网客户端使用

# 临时使用
pip install numpy -i http://pypi-offline-server:8080/simple

# 永久配置
pip config set global.index-url http://pypi-offline-server:8080/simple

方案优势

1.避免触发下载限制:

  • 使用低并发(2个线程)
  • 随机延迟(5-15秒)
  • 批次处理(每50个包)
  • 失败重试机制
  • 使用官方PyPI而非清华镜像(更宽松)

2.完整镜像:

  • 下载指定Python版本的所有兼容包
  • 生成符合PEP 503标准的simple index
  • 支持所有pip安装方式

3.轻量级部署:

  • 仅需Python内置HTTP服务器
  • 无需额外依赖
  • 完全静态内容,易于复制

4.可验证性:

  • 生成下载报告
  • 记录失败包以便重试
  • 简单的README说明

注意事项

  • 下载时间: 完整镜像需要较长时间(可能数天),因为要遵守严格的下载限制
  • 存储空间: Python 3.8的完整镜像约需50-100GB空间
  • 分阶段下载: 如果中途失败,可从failed_packages.txt继续
  • 版本选择: 建议选择企业常用的稳定版本(如3.8或3.9)

此方案确保您可以安全地一次性准备完整镜像,然后完全离线使用,完美符合您的内网部署需求。

以上就是Python一次性将指定版本所有包上传PyPI镜像解决方案的详细内容,更多关于Python打包到PyPI的资料请关注编程China编程(www.chinasem.cn)其它相关文章!

这篇关于Python一次性将指定版本所有包上传PyPI镜像解决方案的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1155921

相关文章

线上Java OOM问题定位与解决方案超详细解析

《线上JavaOOM问题定位与解决方案超详细解析》OOM是JVM抛出的错误,表示内存分配失败,:本文主要介绍线上JavaOOM问题定位与解决方案的相关资料,文中通过代码介绍的非常详细,需要的朋... 目录一、OOM问题核心认知1.1 OOM定义与技术定位1.2 OOM常见类型及技术特征二、OOM问题定位工具

Python的Darts库实现时间序列预测

《Python的Darts库实现时间序列预测》Darts一个集统计、机器学习与深度学习模型于一体的Python时间序列预测库,本文主要介绍了Python的Darts库实现时间序列预测,感兴趣的可以了解... 目录目录一、什么是 Darts?二、安装与基本配置安装 Darts导入基础模块三、时间序列数据结构与

Python正则表达式匹配和替换的操作指南

《Python正则表达式匹配和替换的操作指南》正则表达式是处理文本的强大工具,Python通过re模块提供了完整的正则表达式功能,本文将通过代码示例详细介绍Python中的正则匹配和替换操作,需要的朋... 目录基础语法导入re模块基本元字符常用匹配方法1. re.match() - 从字符串开头匹配2.

Python使用FastAPI实现大文件分片上传与断点续传功能

《Python使用FastAPI实现大文件分片上传与断点续传功能》大文件直传常遇到超时、网络抖动失败、失败后只能重传的问题,分片上传+断点续传可以把大文件拆成若干小块逐个上传,并在中断后从已完成分片继... 目录一、接口设计二、服务端实现(FastAPI)2.1 运行环境2.2 目录结构建议2.3 serv

通过Docker容器部署Python环境的全流程

《通过Docker容器部署Python环境的全流程》在现代化开发流程中,Docker因其轻量化、环境隔离和跨平台一致性的特性,已成为部署Python应用的标准工具,本文将详细演示如何通过Docker容... 目录引言一、docker与python的协同优势二、核心步骤详解三、进阶配置技巧四、生产环境最佳实践

SpringBoot+RustFS 实现文件切片极速上传的实例代码

《SpringBoot+RustFS实现文件切片极速上传的实例代码》本文介绍利用SpringBoot和RustFS构建高性能文件切片上传系统,实现大文件秒传、断点续传和分片上传等功能,具有一定的参考... 目录一、为什么选择 RustFS + SpringBoot?二、环境准备与部署2.1 安装 RustF

java.sql.SQLTransientConnectionException连接超时异常原因及解决方案

《java.sql.SQLTransientConnectionException连接超时异常原因及解决方案》:本文主要介绍java.sql.SQLTransientConnectionExcep... 目录一、引言二、异常信息分析三、可能的原因3.1 连接池配置不合理3.2 数据库负载过高3.3 连接泄漏

Python实现Excel批量样式修改器(附完整代码)

《Python实现Excel批量样式修改器(附完整代码)》这篇文章主要为大家详细介绍了如何使用Python实现一个Excel批量样式修改器,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一... 目录前言功能特性核心功能界面特性系统要求安装说明使用指南基本操作流程高级功能技术实现核心技术栈关键函

python获取指定名字的程序的文件路径的两种方法

《python获取指定名字的程序的文件路径的两种方法》本文主要介绍了python获取指定名字的程序的文件路径的两种方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要... 最近在做项目,需要用到给定一个程序名字就可以自动获取到这个程序在Windows系统下的绝对路径,以下

使用Python批量将.ncm格式的音频文件转换为.mp3格式的实战详解

《使用Python批量将.ncm格式的音频文件转换为.mp3格式的实战详解》本文详细介绍了如何使用Python通过ncmdump工具批量将.ncm音频转换为.mp3的步骤,包括安装、配置ffmpeg环... 目录1. 前言2. 安装 ncmdump3. 实现 .ncm 转 .mp34. 执行过程5. 执行结