Python并行处理实战之如何使用ProcessPoolExecutor加速计算

本文主要是介绍Python并行处理实战之如何使用ProcessPoolExecutor加速计算,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

《Python并行处理实战之如何使用ProcessPoolExecutor加速计算》Python提供了多种并行处理的方式,其中concurrent.futures模块的ProcessPoolExecu...

简介

在现代计算中,并行处理是提高程序性能的重要手段。python提供了多种并行处理的方式,其中concurrent.futures模块的ProcessPoolExecutor是一个非常强大且易于使用的工具。本文将通过一个实际示例,展示如何使用ProcessPoolExecutor进行并行处理,并详细解释代码的工作原理。

完整代码示例

import time
import multiprocessing
from concurrent.futures javascriptimport ProcessPoolExecutor, as_completed
from typing import List
def process_numbers(chunk: List[int], factor: int) -> str:
    """
    处理数字的函数,通过将它们乘以因子来模拟处理。
    这个函数接受一个数字列表和一个因子,计算列表中每个数字乘以因子的和,
    并返回结果字符串。
    """
    result = sum(x * factor for x in chunk)
    time.sleep(0.1)  # 使用睡眠模拟工作
    return f"处理的块和: {result}"
def main(numbers: List[int] = None, num_chunks: int = 10, factor: int = 2):
    """
    演示并行处理的主函数。
    这个函数负责设置日志记录、生成数字列表、确定最佳工作进程数量、
    将数字分成块,并使用ProcessPoolExecutor进行并行处理。
    """
    import logging
    logging.basicConfig(level=logging.INFO)
    _log = logging.getLogger(__name__)
    # 如果没有提供数字,则生成示例列表
    if numbers is None:
        numbers = list(range(1, 101))  # 生成1到100的数字
    total_numbers = len(numbers)
    _log.info(f"开始并行处理 {total_numbers} 个数字")
    cpu_count = multiprocessing.cpu_count()
    _log.info(f"检测到 {cpu_count} 个CPU核心")
    # 确定最佳工作进程数量
    optimal_workers = min(cpu_count, num_chunks)
    _log.info(f"使用 {optimal_workers} 个工作进程")
    # 计算块大小
    chunk_size = max(1, total_numbers // optimal_workers)
    _log.info(f"每个块包含 {chunk_size} 个数字")
    # 将数字分成块
    chunks = [numbers[i:i + chunk_size] for i in range(0, totapythonl_numbers, chunk_size)]
    _log.info(f"总共生成了 {len(chunks)} 个块")
    start_time = time.time()
    processed_count = 0
    # 使用ProcessPoolExecutor进行并行处理
    with ProcessPoolExecutor(max_workers=optimal_workers) as executor:
        _log.info("启动ProcessPoolExecutor")
        # 提交所有任务
        futures = [executor.submit(process_numbers, chunk, factor) for chunk in chunks]
        _log.info(f"提交了 {len(futures)} 个任务")
        # 等待完成并收集结果
        for future in as_completed(futures):
            try:
                result = future.result()
                processed_count += 1
                _log.info(f"{'#'*50}\n{result} ({processed_count}/{len(chunks)} 总计)\n{'#'*50}")
            except Exception as e:
                _log.error(f"处理块时出错: {str(e)}")
                raise
    elapsed_time = time.time() - start_time
    _log.info(f"并行处理完成,耗时 {elapsed_time:.2f} 秒。")
if __name__ == "__main__":
    # 使用数字列表的示例
    main()

代码解释

1. 导入必要的模块

import time
import multiprocessing
from concurrent.futures import ProcessPoolExecutor, as_completed
from typing import List

这些模块提供了我们需要的并行处理功能和类型提示。

2. 定义php处理函数

def process_numbers(chunk: List[int], factor: int) -> str:
    """
    处理数字的函数,通过将它们乘以因子来模拟处理。
    这个函数接受一个数字列表和一个因子,计算列表中每个数字乘以因子的和,
    并返回结果字符串。
    """
    result = sum(x * factor for x in chunk)
    time.sleep(0.1)  # 使用睡眠模拟工作
    return f"处理的块和: {result}"

这个函数模拟了对数字列表的处理,通过将每个数字乘以一个因子并求和。time.sleep(0.1)用于模拟实际工作。

3. 主函数

def main(numbers: List[int] = None, num_chunks: int = 10, factor: int = 2):
    """
    演示并行处理的主函数。
    这个函数负责设置日志记录、生成数字列表、确定最佳工作进程数量、
    将数字分成块,并使用ProcessPoolExecutor进行并行处理。
    """
    import logging
    logging.basicConfig(level=logging.INFO)
    _log = logging.getLogger(__name__)

主函数负责设置日志记录、生成数字列表、确定最佳工作进程数量、将数字分成块,并使用ProcessPoolExecutor进行并行处理。

4. 生成数字列表

    # 如果没有提供数字,则生成示例列表
    if numbers is None:
        numbers = list(range(1, 101))  # 生成1到100的数字

如果没有提供数字列表,则生成1到100的数字列表。

5. 确定最佳工作进程数量

    cpu_count = multiprocessing.cpu_count()
    _log.info(f"检测到 {cpu_count} 个CPU核心")
    # 确定最佳工作进程数量
    optimal_workers = min(cpu_count, num_chunks)
    _log.info(f"使用 {optimal_workers} 个工作进程")

根据CPU核心数和用户指定的块数,确定最佳工作进程数量。

6. 将数字分成块

    # 计算块大小
    androidchunk_size = max(1, total_numbers // optimal_workers)
    _log.info(f"每个块包含 {chunk_size} 个数字")
    # 将数字分成块
    chunks = [numbers[i:i + chunk_size] for i in range(0, total_numbers, chunk_size)]
    _log.info(f"总共生成了 {len(chunks)} 个块")

将数字列表分成多个块,每个块的大小根据总数和工作进程数量计算。

7. 并行处理

    start_time = time.time()
    processed_count = 0
    # 使用ProcessPoolExecutor进行并行处理
    with ProcessPoolExecutor(max_workers=optimal_workers) as executor:
        _log.info("启动ProcessPoolExecutor")
        # 提交所有任务
        futures = [executor.submit(process_numbers, chunk, factor) for chunk in chunks]
        _log.info(f"提交了 {len(futures)} 个任务")
        # 等待完成并收集结果
        for future in as_completed(futures):
            try:
                result = future.result()
                processed_count += 1
                _log.info(f"{'#'*50}\n{result} ({processed_count}/{len(chunks)} 总计)\n{'#'*50}")
            except Exception as e:
                _log.error(f"处理块时出错: {str(e)}")
                raise

使用ProcessPoolExecutor进行并行处理,提交所有任务并等待完成。

8. 计算耗时

    elapsed_time = time.time() - start_time
    _log.info(f"并行处理完成,耗时 {elapsed_time:.2f} 秒。")

计算并行处理的总耗时并输出。

并行处理的基本概念和优势

并行处理是指同时执行多个任务,以提高程序的执行效率。Python的concurrent.futures模块提供了一个高级接口,用于并行执行任务。ProcessPoolExecutor是其中一个重要的类,它使用多进程来并行执行任务。

并行处理的优势包括:

  • 提高程序的执行效率
  • 充分利用多核CPU的计算能力
  • 简化多线程或多进程编程的复杂性

如何运行和测试这个示例

  • 将上述代码保存为parallhttp://www.chinasem.cnel_processing_example.py文件。
  • 确保你的Python环境中安装了必要的模块(本示例不需要额外安装模块)。
  • 在终端或命令行中运行以下命令:
python parallel_processing_example.py

你将看到程序的执行过程和并行处理的结果。

总结

通过这个示例,我们展示了如何使用Python的ProcessPoolExecutor进行并行处理。并行处理是提高程序性能的重要手段,特别是在处理大量数据或计算密集型任务时。希望这个示例能帮助你更好地理解并行处理的概念和实现。

到此这篇关于Python并行处理实战之如何使用ProcessPoolExecutor加速计算的文章就介绍到这了,更多相关Python ProcessPoolExecutor加速计算内容请搜索China编程(www.chinasem.cn)以前的文章或继续浏览下面的相关文章希望大家以后多多支持China编程(www.chinasem.cn)!

这篇关于Python并行处理实战之如何使用ProcessPoolExecutor加速计算的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1155058

相关文章

postgresql使用UUID函数的方法

《postgresql使用UUID函数的方法》本文给大家介绍postgresql使用UUID函数的方法,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录PostgreSQL有两种生成uuid的方法。可以先通过sql查看是否已安装扩展函数,和可以安装的扩展函数

Python实现终端清屏的几种方式详解

《Python实现终端清屏的几种方式详解》在使用Python进行终端交互式编程时,我们经常需要清空当前终端屏幕的内容,本文为大家整理了几种常见的实现方法,有需要的小伙伴可以参考下... 目录方法一:使用 `os` 模块调用系统命令方法二:使用 `subprocess` 模块执行命令方法三:打印多个换行符模拟

Python实现MQTT通信的示例代码

《Python实现MQTT通信的示例代码》本文主要介绍了Python实现MQTT通信的示例代码,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一... 目录1. 安装paho-mqtt库‌2. 搭建MQTT代理服务器(Broker)‌‌3. pytho

如何使用Lombok进行spring 注入

《如何使用Lombok进行spring注入》本文介绍如何用Lombok简化Spring注入,推荐优先使用setter注入,通过注解自动生成getter/setter及构造器,减少冗余代码,提升开发效... Lombok为了开发环境简化代码,好处不用多说。spring 注入方式为2种,构造器注入和setter

基于Python开发一个图像水印批量添加工具

《基于Python开发一个图像水印批量添加工具》在当今数字化内容爆炸式增长的时代,图像版权保护已成为创作者和企业的核心需求,本方案将详细介绍一个基于PythonPIL库的工业级图像水印解决方案,有需要... 目录一、系统架构设计1.1 整体处理流程1.2 类结构设计(扩展版本)二、核心算法深入解析2.1 自

MySQL中比较运算符的具体使用

《MySQL中比较运算符的具体使用》本文介绍了SQL中常用的符号类型和非符号类型运算符,符号类型运算符包括等于(=)、安全等于(=)、不等于(/!=)、大小比较(,=,,=)等,感兴趣的可以了解一下... 目录符号类型运算符1. 等于运算符=2. 安全等于运算符<=>3. 不等于运算符<>或!=4. 小于运

使用zip4j实现Java中的ZIP文件加密压缩的操作方法

《使用zip4j实现Java中的ZIP文件加密压缩的操作方法》本文介绍如何通过Maven集成zip4j1.3.2库创建带密码保护的ZIP文件,涵盖依赖配置、代码示例及加密原理,确保数据安全性,感兴趣的... 目录1. zip4j库介绍和版本1.1 zip4j库概述1.2 zip4j的版本演变1.3 zip4

从入门到进阶讲解Python自动化Playwright实战指南

《从入门到进阶讲解Python自动化Playwright实战指南》Playwright是针对Python语言的纯自动化工具,它可以通过单个API自动执行Chromium,Firefox和WebKit... 目录Playwright 简介核心优势安装步骤观点与案例结合Playwright 核心功能从零开始学习

Python 字典 (Dictionary)使用详解

《Python字典(Dictionary)使用详解》字典是python中最重要,最常用的数据结构之一,它提供了高效的键值对存储和查找能力,:本文主要介绍Python字典(Dictionary)... 目录字典1.基本特性2.创建字典3.访问元素4.修改字典5.删除元素6.字典遍历7.字典的高级特性默认字典

Python自动化批量重命名与整理文件系统

《Python自动化批量重命名与整理文件系统》这篇文章主要为大家详细介绍了如何使用Python实现一个强大的文件批量重命名与整理工具,帮助开发者自动化这一繁琐过程,有需要的小伙伴可以了解下... 目录简介环境准备项目功能概述代码详细解析1. 导入必要的库2. 配置参数设置3. 创建日志系统4. 安全文件名处