Python并行处理实战之如何使用ProcessPoolExecutor加速计算

本文主要是介绍Python并行处理实战之如何使用ProcessPoolExecutor加速计算,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

《Python并行处理实战之如何使用ProcessPoolExecutor加速计算》Python提供了多种并行处理的方式,其中concurrent.futures模块的ProcessPoolExecu...

简介

在现代计算中,并行处理是提高程序性能的重要手段。python提供了多种并行处理的方式,其中concurrent.futures模块的ProcessPoolExecutor是一个非常强大且易于使用的工具。本文将通过一个实际示例,展示如何使用ProcessPoolExecutor进行并行处理,并详细解释代码的工作原理。

完整代码示例

import time
import multiprocessing
from concurrent.futures javascriptimport ProcessPoolExecutor, as_completed
from typing import List
def process_numbers(chunk: List[int], factor: int) -> str:
    """
    处理数字的函数,通过将它们乘以因子来模拟处理。
    这个函数接受一个数字列表和一个因子,计算列表中每个数字乘以因子的和,
    并返回结果字符串。
    """
    result = sum(x * factor for x in chunk)
    time.sleep(0.1)  # 使用睡眠模拟工作
    return f"处理的块和: {result}"
def main(numbers: List[int] = None, num_chunks: int = 10, factor: int = 2):
    """
    演示并行处理的主函数。
    这个函数负责设置日志记录、生成数字列表、确定最佳工作进程数量、
    将数字分成块,并使用ProcessPoolExecutor进行并行处理。
    """
    import logging
    logging.basicConfig(level=logging.INFO)
    _log = logging.getLogger(__name__)
    # 如果没有提供数字,则生成示例列表
    if numbers is None:
        numbers = list(range(1, 101))  # 生成1到100的数字
    total_numbers = len(numbers)
    _log.info(f"开始并行处理 {total_numbers} 个数字")
    cpu_count = multiprocessing.cpu_count()
    _log.info(f"检测到 {cpu_count} 个CPU核心")
    # 确定最佳工作进程数量
    optimal_workers = min(cpu_count, num_chunks)
    _log.info(f"使用 {optimal_workers} 个工作进程")
    # 计算块大小
    chunk_size = max(1, total_numbers // optimal_workers)
    _log.info(f"每个块包含 {chunk_size} 个数字")
    # 将数字分成块
    chunks = [numbers[i:i + chunk_size] for i in range(0, totapythonl_numbers, chunk_size)]
    _log.info(f"总共生成了 {len(chunks)} 个块")
    start_time = time.time()
    processed_count = 0
    # 使用ProcessPoolExecutor进行并行处理
    with ProcessPoolExecutor(max_workers=optimal_workers) as executor:
        _log.info("启动ProcessPoolExecutor")
        # 提交所有任务
        futures = [executor.submit(process_numbers, chunk, factor) for chunk in chunks]
        _log.info(f"提交了 {len(futures)} 个任务")
        # 等待完成并收集结果
        for future in as_completed(futures):
            try:
                result = future.result()
                processed_count += 1
                _log.info(f"{'#'*50}\n{result} ({processed_count}/{len(chunks)} 总计)\n{'#'*50}")
            except Exception as e:
                _log.error(f"处理块时出错: {str(e)}")
                raise
    elapsed_time = time.time() - start_time
    _log.info(f"并行处理完成,耗时 {elapsed_time:.2f} 秒。")
if __name__ == "__main__":
    # 使用数字列表的示例
    main()

代码解释

1. 导入必要的模块

import time
import multiprocessing
from concurrent.futures import ProcessPoolExecutor, as_completed
from typing import List

这些模块提供了我们需要的并行处理功能和类型提示。

2. 定义php处理函数

def process_numbers(chunk: List[int], factor: int) -> str:
    """
    处理数字的函数,通过将它们乘以因子来模拟处理。
    这个函数接受一个数字列表和一个因子,计算列表中每个数字乘以因子的和,
    并返回结果字符串。
    """
    result = sum(x * factor for x in chunk)
    time.sleep(0.1)  # 使用睡眠模拟工作
    return f"处理的块和: {result}"

这个函数模拟了对数字列表的处理,通过将每个数字乘以一个因子并求和。time.sleep(0.1)用于模拟实际工作。

3. 主函数

def main(numbers: List[int] = None, num_chunks: int = 10, factor: int = 2):
    """
    演示并行处理的主函数。
    这个函数负责设置日志记录、生成数字列表、确定最佳工作进程数量、
    将数字分成块,并使用ProcessPoolExecutor进行并行处理。
    """
    import logging
    logging.basicConfig(level=logging.INFO)
    _log = logging.getLogger(__name__)

主函数负责设置日志记录、生成数字列表、确定最佳工作进程数量、将数字分成块,并使用ProcessPoolExecutor进行并行处理。

4. 生成数字列表

    # 如果没有提供数字,则生成示例列表
    if numbers is None:
        numbers = list(range(1, 101))  # 生成1到100的数字

如果没有提供数字列表,则生成1到100的数字列表。

5. 确定最佳工作进程数量

    cpu_count = multiprocessing.cpu_count()
    _log.info(f"检测到 {cpu_count} 个CPU核心")
    # 确定最佳工作进程数量
    optimal_workers = min(cpu_count, num_chunks)
    _log.info(f"使用 {optimal_workers} 个工作进程")

根据CPU核心数和用户指定的块数,确定最佳工作进程数量。

6. 将数字分成块

    # 计算块大小
    androidchunk_size = max(1, total_numbers // optimal_workers)
    _log.info(f"每个块包含 {chunk_size} 个数字")
    # 将数字分成块
    chunks = [numbers[i:i + chunk_size] for i in range(0, total_numbers, chunk_size)]
    _log.info(f"总共生成了 {len(chunks)} 个块")

将数字列表分成多个块,每个块的大小根据总数和工作进程数量计算。

7. 并行处理

    start_time = time.time()
    processed_count = 0
    # 使用ProcessPoolExecutor进行并行处理
    with ProcessPoolExecutor(max_workers=optimal_workers) as executor:
        _log.info("启动ProcessPoolExecutor")
        # 提交所有任务
        futures = [executor.submit(process_numbers, chunk, factor) for chunk in chunks]
        _log.info(f"提交了 {len(futures)} 个任务")
        # 等待完成并收集结果
        for future in as_completed(futures):
            try:
                result = future.result()
                processed_count += 1
                _log.info(f"{'#'*50}\n{result} ({processed_count}/{len(chunks)} 总计)\n{'#'*50}")
            except Exception as e:
                _log.error(f"处理块时出错: {str(e)}")
                raise

使用ProcessPoolExecutor进行并行处理,提交所有任务并等待完成。

8. 计算耗时

    elapsed_time = time.time() - start_time
    _log.info(f"并行处理完成,耗时 {elapsed_time:.2f} 秒。")

计算并行处理的总耗时并输出。

并行处理的基本概念和优势

并行处理是指同时执行多个任务,以提高程序的执行效率。Python的concurrent.futures模块提供了一个高级接口,用于并行执行任务。ProcessPoolExecutor是其中一个重要的类,它使用多进程来并行执行任务。

并行处理的优势包括:

  • 提高程序的执行效率
  • 充分利用多核CPU的计算能力
  • 简化多线程或多进程编程的复杂性

如何运行和测试这个示例

  • 将上述代码保存为parallhttp://www.chinasem.cnel_processing_example.py文件。
  • 确保你的Python环境中安装了必要的模块(本示例不需要额外安装模块)。
  • 在终端或命令行中运行以下命令:
python parallel_processing_example.py

你将看到程序的执行过程和并行处理的结果。

总结

通过这个示例,我们展示了如何使用Python的ProcessPoolExecutor进行并行处理。并行处理是提高程序性能的重要手段,特别是在处理大量数据或计算密集型任务时。希望这个示例能帮助你更好地理解并行处理的概念和实现。

到此这篇关于Python并行处理实战之如何使用ProcessPoolExecutor加速计算的文章就介绍到这了,更多相关Python ProcessPoolExecutor加速计算内容请搜索China编程(www.chinasem.cn)以前的文章或继续浏览下面的相关文章希望大家以后多多支持China编程(www.chinasem.cn)!

这篇关于Python并行处理实战之如何使用ProcessPoolExecutor加速计算的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1155058

相关文章

Python中help()和dir()函数的使用

《Python中help()和dir()函数的使用》我们经常需要查看某个对象(如模块、类、函数等)的属性和方法,Python提供了两个内置函数help()和dir(),它们可以帮助我们快速了解代... 目录1. 引言2. help() 函数2.1 作用2.2 使用方法2.3 示例(1) 查看内置函数的帮助(

Python虚拟环境与Conda使用指南分享

《Python虚拟环境与Conda使用指南分享》:本文主要介绍Python虚拟环境与Conda使用指南,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、python 虚拟环境概述1.1 什么是虚拟环境1.2 为什么需要虚拟环境二、Python 内置的虚拟环境工具

Linux脚本(shell)的使用方式

《Linux脚本(shell)的使用方式》:本文主要介绍Linux脚本(shell)的使用方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录概述语法详解数学运算表达式Shell变量变量分类环境变量Shell内部变量自定义变量:定义、赋值自定义变量:引用、修改、删

Python实例题之pygame开发打飞机游戏实例代码

《Python实例题之pygame开发打飞机游戏实例代码》对于python的学习者,能够写出一个飞机大战的程序代码,是不是感觉到非常的开心,:本文主要介绍Python实例题之pygame开发打飞机... 目录题目pygame-aircraft-game使用 Pygame 开发的打飞机游戏脚本代码解释初始化部

Python pip下载包及所有依赖到指定文件夹的步骤说明

《Pythonpip下载包及所有依赖到指定文件夹的步骤说明》为了方便开发和部署,我们常常需要将Python项目所依赖的第三方包导出到本地文件夹中,:本文主要介绍Pythonpip下载包及所有依... 目录步骤说明命令格式示例参数说明离线安装方法注意事项总结要使用pip下载包及其所有依赖到指定文件夹,请按照以

Python实现精准提取 PDF中的文本,表格与图片

《Python实现精准提取PDF中的文本,表格与图片》在实际的系统开发中,处理PDF文件不仅限于读取整页文本,还有提取文档中的表格数据,图片或特定区域的内容,下面我们来看看如何使用Python实... 目录安装 python 库提取 PDF 文本内容:获取整页文本与指定区域内容获取页面上的所有文本内容获取

基于Python实现一个Windows Tree命令工具

《基于Python实现一个WindowsTree命令工具》今天想要在Windows平台的CMD命令终端窗口中使用像Linux下的tree命令,打印一下目录结构层级树,然而还真有tree命令,但是发现... 目录引言实现代码使用说明可用选项示例用法功能特点添加到环境变量方法一:创建批处理文件并添加到PATH1

Python包管理工具核心指令uvx举例详细解析

《Python包管理工具核心指令uvx举例详细解析》:本文主要介绍Python包管理工具核心指令uvx的相关资料,uvx是uv工具链中用于临时运行Python命令行工具的高效执行器,依托Rust实... 目录一、uvx 的定位与核心功能二、uvx 的典型应用场景三、uvx 与传统工具对比四、uvx 的技术实

Java使用HttpClient实现图片下载与本地保存功能

《Java使用HttpClient实现图片下载与本地保存功能》在当今数字化时代,网络资源的获取与处理已成为软件开发中的常见需求,其中,图片作为网络上最常见的资源之一,其下载与保存功能在许多应用场景中都... 目录引言一、Apache HttpClient简介二、技术栈与环境准备三、实现图片下载与保存功能1.

Python中使用uv创建环境及原理举例详解

《Python中使用uv创建环境及原理举例详解》uv是Astral团队开发的高性能Python工具,整合包管理、虚拟环境、Python版本控制等功能,:本文主要介绍Python中使用uv创建环境及... 目录一、uv工具简介核心特点:二、安装uv1. 通过pip安装2. 通过脚本安装验证安装:配置镜像源(可