Python并行处理实战之如何使用ProcessPoolExecutor加速计算

本文主要是介绍Python并行处理实战之如何使用ProcessPoolExecutor加速计算,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

《Python并行处理实战之如何使用ProcessPoolExecutor加速计算》Python提供了多种并行处理的方式,其中concurrent.futures模块的ProcessPoolExecu...

简介

在现代计算中,并行处理是提高程序性能的重要手段。python提供了多种并行处理的方式,其中concurrent.futures模块的ProcessPoolExecutor是一个非常强大且易于使用的工具。本文将通过一个实际示例,展示如何使用ProcessPoolExecutor进行并行处理,并详细解释代码的工作原理。

完整代码示例

import time
import multiprocessing
from concurrent.futures javascriptimport ProcessPoolExecutor, as_completed
from typing import List
def process_numbers(chunk: List[int], factor: int) -> str:
    """
    处理数字的函数,通过将它们乘以因子来模拟处理。
    这个函数接受一个数字列表和一个因子,计算列表中每个数字乘以因子的和,
    并返回结果字符串。
    """
    result = sum(x * factor for x in chunk)
    time.sleep(0.1)  # 使用睡眠模拟工作
    return f"处理的块和: {result}"
def main(numbers: List[int] = None, num_chunks: int = 10, factor: int = 2):
    """
    演示并行处理的主函数。
    这个函数负责设置日志记录、生成数字列表、确定最佳工作进程数量、
    将数字分成块,并使用ProcessPoolExecutor进行并行处理。
    """
    import logging
    logging.basicConfig(level=logging.INFO)
    _log = logging.getLogger(__name__)
    # 如果没有提供数字,则生成示例列表
    if numbers is None:
        numbers = list(range(1, 101))  # 生成1到100的数字
    total_numbers = len(numbers)
    _log.info(f"开始并行处理 {total_numbers} 个数字")
    cpu_count = multiprocessing.cpu_count()
    _log.info(f"检测到 {cpu_count} 个CPU核心")
    # 确定最佳工作进程数量
    optimal_workers = min(cpu_count, num_chunks)
    _log.info(f"使用 {optimal_workers} 个工作进程")
    # 计算块大小
    chunk_size = max(1, total_numbers // optimal_workers)
    _log.info(f"每个块包含 {chunk_size} 个数字")
    # 将数字分成块
    chunks = [numbers[i:i + chunk_size] for i in range(0, totapythonl_numbers, chunk_size)]
    _log.info(f"总共生成了 {len(chunks)} 个块")
    start_time = time.time()
    processed_count = 0
    # 使用ProcessPoolExecutor进行并行处理
    with ProcessPoolExecutor(max_workers=optimal_workers) as executor:
        _log.info("启动ProcessPoolExecutor")
        # 提交所有任务
        futures = [executor.submit(process_numbers, chunk, factor) for chunk in chunks]
        _log.info(f"提交了 {len(futures)} 个任务")
        # 等待完成并收集结果
        for future in as_completed(futures):
            try:
                result = future.result()
                processed_count += 1
                _log.info(f"{'#'*50}\n{result} ({processed_count}/{len(chunks)} 总计)\n{'#'*50}")
            except Exception as e:
                _log.error(f"处理块时出错: {str(e)}")
                raise
    elapsed_time = time.time() - start_time
    _log.info(f"并行处理完成,耗时 {elapsed_time:.2f} 秒。")
if __name__ == "__main__":
    # 使用数字列表的示例
    main()

代码解释

1. 导入必要的模块

import time
import multiprocessing
from concurrent.futures import ProcessPoolExecutor, as_completed
from typing import List

这些模块提供了我们需要的并行处理功能和类型提示。

2. 定义php处理函数

def process_numbers(chunk: List[int], factor: int) -> str:
    """
    处理数字的函数,通过将它们乘以因子来模拟处理。
    这个函数接受一个数字列表和一个因子,计算列表中每个数字乘以因子的和,
    并返回结果字符串。
    """
    result = sum(x * factor for x in chunk)
    time.sleep(0.1)  # 使用睡眠模拟工作
    return f"处理的块和: {result}"

这个函数模拟了对数字列表的处理,通过将每个数字乘以一个因子并求和。time.sleep(0.1)用于模拟实际工作。

3. 主函数

def main(numbers: List[int] = None, num_chunks: int = 10, factor: int = 2):
    """
    演示并行处理的主函数。
    这个函数负责设置日志记录、生成数字列表、确定最佳工作进程数量、
    将数字分成块,并使用ProcessPoolExecutor进行并行处理。
    """
    import logging
    logging.basicConfig(level=logging.INFO)
    _log = logging.getLogger(__name__)

主函数负责设置日志记录、生成数字列表、确定最佳工作进程数量、将数字分成块,并使用ProcessPoolExecutor进行并行处理。

4. 生成数字列表

    # 如果没有提供数字,则生成示例列表
    if numbers is None:
        numbers = list(range(1, 101))  # 生成1到100的数字

如果没有提供数字列表,则生成1到100的数字列表。

5. 确定最佳工作进程数量

    cpu_count = multiprocessing.cpu_count()
    _log.info(f"检测到 {cpu_count} 个CPU核心")
    # 确定最佳工作进程数量
    optimal_workers = min(cpu_count, num_chunks)
    _log.info(f"使用 {optimal_workers} 个工作进程")

根据CPU核心数和用户指定的块数,确定最佳工作进程数量。

6. 将数字分成块

    # 计算块大小
    androidchunk_size = max(1, total_numbers // optimal_workers)
    _log.info(f"每个块包含 {chunk_size} 个数字")
    # 将数字分成块
    chunks = [numbers[i:i + chunk_size] for i in range(0, total_numbers, chunk_size)]
    _log.info(f"总共生成了 {len(chunks)} 个块")

将数字列表分成多个块,每个块的大小根据总数和工作进程数量计算。

7. 并行处理

    start_time = time.time()
    processed_count = 0
    # 使用ProcessPoolExecutor进行并行处理
    with ProcessPoolExecutor(max_workers=optimal_workers) as executor:
        _log.info("启动ProcessPoolExecutor")
        # 提交所有任务
        futures = [executor.submit(process_numbers, chunk, factor) for chunk in chunks]
        _log.info(f"提交了 {len(futures)} 个任务")
        # 等待完成并收集结果
        for future in as_completed(futures):
            try:
                result = future.result()
                processed_count += 1
                _log.info(f"{'#'*50}\n{result} ({processed_count}/{len(chunks)} 总计)\n{'#'*50}")
            except Exception as e:
                _log.error(f"处理块时出错: {str(e)}")
                raise

使用ProcessPoolExecutor进行并行处理,提交所有任务并等待完成。

8. 计算耗时

    elapsed_time = time.time() - start_time
    _log.info(f"并行处理完成,耗时 {elapsed_time:.2f} 秒。")

计算并行处理的总耗时并输出。

并行处理的基本概念和优势

并行处理是指同时执行多个任务,以提高程序的执行效率。Python的concurrent.futures模块提供了一个高级接口,用于并行执行任务。ProcessPoolExecutor是其中一个重要的类,它使用多进程来并行执行任务。

并行处理的优势包括:

  • 提高程序的执行效率
  • 充分利用多核CPU的计算能力
  • 简化多线程或多进程编程的复杂性

如何运行和测试这个示例

  • 将上述代码保存为parallhttp://www.chinasem.cnel_processing_example.py文件。
  • 确保你的Python环境中安装了必要的模块(本示例不需要额外安装模块)。
  • 在终端或命令行中运行以下命令:
python parallel_processing_example.py

你将看到程序的执行过程和并行处理的结果。

总结

通过这个示例,我们展示了如何使用Python的ProcessPoolExecutor进行并行处理。并行处理是提高程序性能的重要手段,特别是在处理大量数据或计算密集型任务时。希望这个示例能帮助你更好地理解并行处理的概念和实现。

到此这篇关于Python并行处理实战之如何使用ProcessPoolExecutor加速计算的文章就介绍到这了,更多相关Python ProcessPoolExecutor加速计算内容请搜索China编程(www.chinasem.cn)以前的文章或继续浏览下面的相关文章希望大家以后多多支持China编程(www.chinasem.cn)!

这篇关于Python并行处理实战之如何使用ProcessPoolExecutor加速计算的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1155058

相关文章

java中pdf模版填充表单踩坑实战记录(itextPdf、openPdf、pdfbox)

《java中pdf模版填充表单踩坑实战记录(itextPdf、openPdf、pdfbox)》:本文主要介绍java中pdf模版填充表单踩坑的相关资料,OpenPDF、iText、PDFBox是三... 目录准备Pdf模版方法1:itextpdf7填充表单(1)加入依赖(2)代码(3)遇到的问题方法2:pd

Python操作PDF文档的主流库使用指南

《Python操作PDF文档的主流库使用指南》PDF因其跨平台、格式固定的特性成为文档交换的标准,然而,由于其复杂的内部结构,程序化操作PDF一直是个挑战,本文主要为大家整理了Python操作PD... 目录一、 基础操作1.PyPDF2 (及其继任者 pypdf)2.PyMuPDF / fitz3.Fre

python设置环境变量路径实现过程

《python设置环境变量路径实现过程》本文介绍设置Python路径的多种方法:临时设置(Windows用`set`,Linux/macOS用`export`)、永久设置(系统属性或shell配置文件... 目录设置python路径的方法临时设置环境变量(适用于当前会话)永久设置环境变量(Windows系统

python中列表应用和扩展性实用详解

《python中列表应用和扩展性实用详解》文章介绍了Python列表的核心特性:有序数据集合,用[]定义,元素类型可不同,支持迭代、循环、切片,可执行增删改查、排序、推导式及嵌套操作,是常用的数据处理... 目录1、列表定义2、格式3、列表是可迭代对象4、列表的常见操作总结1、列表定义是处理一组有序项目的

python运用requests模拟浏览器发送请求过程

《python运用requests模拟浏览器发送请求过程》模拟浏览器请求可选用requests处理静态内容,selenium应对动态页面,playwright支持高级自动化,设置代理和超时参数,根据需... 目录使用requests库模拟浏览器请求使用selenium自动化浏览器操作使用playwright

python使用try函数详解

《python使用try函数详解》Pythontry语句用于异常处理,支持捕获特定/多种异常、else/final子句确保资源释放,结合with语句自动清理,可自定义异常及嵌套结构,灵活应对错误场景... 目录try 函数的基本语法捕获特定异常捕获多个异常使用 else 子句使用 finally 子句捕获所

Python极速搭建局域网文件共享服务器完整指南

《Python极速搭建局域网文件共享服务器完整指南》在办公室或家庭局域网中快速共享文件时,许多人会选择第三方工具或云存储服务,但这些方案往往存在隐私泄露风险或需要复杂配置,下面我们就来看看如何使用Py... 目录一、android基础版:HTTP文件共享的魔法命令1. 一行代码启动HTTP服务器2. 关键参

C++11右值引用与Lambda表达式的使用

《C++11右值引用与Lambda表达式的使用》C++11引入右值引用,实现移动语义提升性能,支持资源转移与完美转发;同时引入Lambda表达式,简化匿名函数定义,通过捕获列表和参数列表灵活处理变量... 目录C++11新特性右值引用和移动语义左值 / 右值常见的左值和右值移动语义移动构造函数移动复制运算符

Python对接支付宝支付之使用AliPay实现的详细操作指南

《Python对接支付宝支付之使用AliPay实现的详细操作指南》支付宝没有提供PythonSDK,但是强大的github就有提供python-alipay-sdk,封装里很多复杂操作,使用这个我们就... 目录一、引言二、准备工作2.1 支付宝开放平台入驻与应用创建2.2 密钥生成与配置2.3 安装ali

C#中lock关键字的使用小结

《C#中lock关键字的使用小结》在C#中,lock关键字用于确保当一个线程位于给定实例的代码块中时,其他线程无法访问同一实例的该代码块,下面就来介绍一下lock关键字的使用... 目录使用方式工作原理注意事项示例代码为什么不能lock值类型在C#中,lock关键字用于确保当一个线程位于给定实例的代码块中时