Python并行处理实战之如何使用ProcessPoolExecutor加速计算

本文主要是介绍Python并行处理实战之如何使用ProcessPoolExecutor加速计算,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

《Python并行处理实战之如何使用ProcessPoolExecutor加速计算》Python提供了多种并行处理的方式,其中concurrent.futures模块的ProcessPoolExecu...

简介

在现代计算中,并行处理是提高程序性能的重要手段。python提供了多种并行处理的方式,其中concurrent.futures模块的ProcessPoolExecutor是一个非常强大且易于使用的工具。本文将通过一个实际示例,展示如何使用ProcessPoolExecutor进行并行处理,并详细解释代码的工作原理。

完整代码示例

import time
import multiprocessing
from concurrent.futures javascriptimport ProcessPoolExecutor, as_completed
from typing import List
def process_numbers(chunk: List[int], factor: int) -> str:
    """
    处理数字的函数,通过将它们乘以因子来模拟处理。
    这个函数接受一个数字列表和一个因子,计算列表中每个数字乘以因子的和,
    并返回结果字符串。
    """
    result = sum(x * factor for x in chunk)
    time.sleep(0.1)  # 使用睡眠模拟工作
    return f"处理的块和: {result}"
def main(numbers: List[int] = None, num_chunks: int = 10, factor: int = 2):
    """
    演示并行处理的主函数。
    这个函数负责设置日志记录、生成数字列表、确定最佳工作进程数量、
    将数字分成块,并使用ProcessPoolExecutor进行并行处理。
    """
    import logging
    logging.basicConfig(level=logging.INFO)
    _log = logging.getLogger(__name__)
    # 如果没有提供数字,则生成示例列表
    if numbers is None:
        numbers = list(range(1, 101))  # 生成1到100的数字
    total_numbers = len(numbers)
    _log.info(f"开始并行处理 {total_numbers} 个数字")
    cpu_count = multiprocessing.cpu_count()
    _log.info(f"检测到 {cpu_count} 个CPU核心")
    # 确定最佳工作进程数量
    optimal_workers = min(cpu_count, num_chunks)
    _log.info(f"使用 {optimal_workers} 个工作进程")
    # 计算块大小
    chunk_size = max(1, total_numbers // optimal_workers)
    _log.info(f"每个块包含 {chunk_size} 个数字")
    # 将数字分成块
    chunks = [numbers[i:i + chunk_size] for i in range(0, totapythonl_numbers, chunk_size)]
    _log.info(f"总共生成了 {len(chunks)} 个块")
    start_time = time.time()
    processed_count = 0
    # 使用ProcessPoolExecutor进行并行处理
    with ProcessPoolExecutor(max_workers=optimal_workers) as executor:
        _log.info("启动ProcessPoolExecutor")
        # 提交所有任务
        futures = [executor.submit(process_numbers, chunk, factor) for chunk in chunks]
        _log.info(f"提交了 {len(futures)} 个任务")
        # 等待完成并收集结果
        for future in as_completed(futures):
            try:
                result = future.result()
                processed_count += 1
                _log.info(f"{'#'*50}\n{result} ({processed_count}/{len(chunks)} 总计)\n{'#'*50}")
            except Exception as e:
                _log.error(f"处理块时出错: {str(e)}")
                raise
    elapsed_time = time.time() - start_time
    _log.info(f"并行处理完成,耗时 {elapsed_time:.2f} 秒。")
if __name__ == "__main__":
    # 使用数字列表的示例
    main()

代码解释

1. 导入必要的模块

import time
import multiprocessing
from concurrent.futures import ProcessPoolExecutor, as_completed
from typing import List

这些模块提供了我们需要的并行处理功能和类型提示。

2. 定义php处理函数

def process_numbers(chunk: List[int], factor: int) -> str:
    """
    处理数字的函数,通过将它们乘以因子来模拟处理。
    这个函数接受一个数字列表和一个因子,计算列表中每个数字乘以因子的和,
    并返回结果字符串。
    """
    result = sum(x * factor for x in chunk)
    time.sleep(0.1)  # 使用睡眠模拟工作
    return f"处理的块和: {result}"

这个函数模拟了对数字列表的处理,通过将每个数字乘以一个因子并求和。time.sleep(0.1)用于模拟实际工作。

3. 主函数

def main(numbers: List[int] = None, num_chunks: int = 10, factor: int = 2):
    """
    演示并行处理的主函数。
    这个函数负责设置日志记录、生成数字列表、确定最佳工作进程数量、
    将数字分成块,并使用ProcessPoolExecutor进行并行处理。
    """
    import logging
    logging.basicConfig(level=logging.INFO)
    _log = logging.getLogger(__name__)

主函数负责设置日志记录、生成数字列表、确定最佳工作进程数量、将数字分成块,并使用ProcessPoolExecutor进行并行处理。

4. 生成数字列表

    # 如果没有提供数字,则生成示例列表
    if numbers is None:
        numbers = list(range(1, 101))  # 生成1到100的数字

如果没有提供数字列表,则生成1到100的数字列表。

5. 确定最佳工作进程数量

    cpu_count = multiprocessing.cpu_count()
    _log.info(f"检测到 {cpu_count} 个CPU核心")
    # 确定最佳工作进程数量
    optimal_workers = min(cpu_count, num_chunks)
    _log.info(f"使用 {optimal_workers} 个工作进程")

根据CPU核心数和用户指定的块数,确定最佳工作进程数量。

6. 将数字分成块

    # 计算块大小
    androidchunk_size = max(1, total_numbers // optimal_workers)
    _log.info(f"每个块包含 {chunk_size} 个数字")
    # 将数字分成块
    chunks = [numbers[i:i + chunk_size] for i in range(0, total_numbers, chunk_size)]
    _log.info(f"总共生成了 {len(chunks)} 个块")

将数字列表分成多个块,每个块的大小根据总数和工作进程数量计算。

7. 并行处理

    start_time = time.time()
    processed_count = 0
    # 使用ProcessPoolExecutor进行并行处理
    with ProcessPoolExecutor(max_workers=optimal_workers) as executor:
        _log.info("启动ProcessPoolExecutor")
        # 提交所有任务
        futures = [executor.submit(process_numbers, chunk, factor) for chunk in chunks]
        _log.info(f"提交了 {len(futures)} 个任务")
        # 等待完成并收集结果
        for future in as_completed(futures):
            try:
                result = future.result()
                processed_count += 1
                _log.info(f"{'#'*50}\n{result} ({processed_count}/{len(chunks)} 总计)\n{'#'*50}")
            except Exception as e:
                _log.error(f"处理块时出错: {str(e)}")
                raise

使用ProcessPoolExecutor进行并行处理,提交所有任务并等待完成。

8. 计算耗时

    elapsed_time = time.time() - start_time
    _log.info(f"并行处理完成,耗时 {elapsed_time:.2f} 秒。")

计算并行处理的总耗时并输出。

并行处理的基本概念和优势

并行处理是指同时执行多个任务,以提高程序的执行效率。Python的concurrent.futures模块提供了一个高级接口,用于并行执行任务。ProcessPoolExecutor是其中一个重要的类,它使用多进程来并行执行任务。

并行处理的优势包括:

  • 提高程序的执行效率
  • 充分利用多核CPU的计算能力
  • 简化多线程或多进程编程的复杂性

如何运行和测试这个示例

  • 将上述代码保存为parallhttp://www.chinasem.cnel_processing_example.py文件。
  • 确保你的Python环境中安装了必要的模块(本示例不需要额外安装模块)。
  • 在终端或命令行中运行以下命令:
python parallel_processing_example.py

你将看到程序的执行过程和并行处理的结果。

总结

通过这个示例,我们展示了如何使用Python的ProcessPoolExecutor进行并行处理。并行处理是提高程序性能的重要手段,特别是在处理大量数据或计算密集型任务时。希望这个示例能帮助你更好地理解并行处理的概念和实现。

到此这篇关于Python并行处理实战之如何使用ProcessPoolExecutor加速计算的文章就介绍到这了,更多相关Python ProcessPoolExecutor加速计算内容请搜索China编程(www.chinasem.cn)以前的文章或继续浏览下面的相关文章希望大家以后多多支持China编程(www.chinasem.cn)!

这篇关于Python并行处理实战之如何使用ProcessPoolExecutor加速计算的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1155058

相关文章

Java中流式并行操作parallelStream的原理和使用方法

《Java中流式并行操作parallelStream的原理和使用方法》本文详细介绍了Java中的并行流(parallelStream)的原理、正确使用方法以及在实际业务中的应用案例,并指出在使用并行流... 目录Java中流式并行操作parallelStream0. 问题的产生1. 什么是parallelS

Linux join命令的使用及说明

《Linuxjoin命令的使用及说明》`join`命令用于在Linux中按字段将两个文件进行连接,类似于SQL的JOIN,它需要两个文件按用于匹配的字段排序,并且第一个文件的换行符必须是LF,`jo... 目录一. 基本语法二. 数据准备三. 指定文件的连接key四.-a输出指定文件的所有行五.-o指定输出

Linux jq命令的使用解读

《Linuxjq命令的使用解读》jq是一个强大的命令行工具,用于处理JSON数据,它可以用来查看、过滤、修改、格式化JSON数据,通过使用各种选项和过滤器,可以实现复杂的JSON处理任务... 目录一. 简介二. 选项2.1.2.2-c2.3-r2.4-R三. 字段提取3.1 普通字段3.2 数组字段四.

Linux kill正在执行的后台任务 kill进程组使用详解

《Linuxkill正在执行的后台任务kill进程组使用详解》文章介绍了两个脚本的功能和区别,以及执行这些脚本时遇到的进程管理问题,通过查看进程树、使用`kill`命令和`lsof`命令,分析了子... 目录零. 用到的命令一. 待执行的脚本二. 执行含子进程的脚本,并kill2.1 进程查看2.2 遇到的

详解SpringBoot+Ehcache使用示例

《详解SpringBoot+Ehcache使用示例》本文介绍了SpringBoot中配置Ehcache、自定义get/set方式,并实际使用缓存的过程,文中通过示例代码介绍的非常详细,对大家的学习或者... 目录摘要概念内存与磁盘持久化存储:配置灵活性:编码示例引入依赖:配置ehcache.XML文件:配置

Java 虚拟线程的创建与使用深度解析

《Java虚拟线程的创建与使用深度解析》虚拟线程是Java19中以预览特性形式引入,Java21起正式发布的轻量级线程,本文给大家介绍Java虚拟线程的创建与使用,感兴趣的朋友一起看看吧... 目录一、虚拟线程简介1.1 什么是虚拟线程?1.2 为什么需要虚拟线程?二、虚拟线程与平台线程对比代码对比示例:三

k8s按需创建PV和使用PVC详解

《k8s按需创建PV和使用PVC详解》Kubernetes中,PV和PVC用于管理持久存储,StorageClass实现动态PV分配,PVC声明存储需求并绑定PV,通过kubectl验证状态,注意回收... 目录1.按需创建 PV(使用 StorageClass)创建 StorageClass2.创建 PV

Python版本信息获取方法详解与实战

《Python版本信息获取方法详解与实战》在Python开发中,获取Python版本号是调试、兼容性检查和版本控制的重要基础操作,本文详细介绍了如何使用sys和platform模块获取Python的主... 目录1. python版本号获取基础2. 使用sys模块获取版本信息2.1 sys模块概述2.1.1

一文详解Python如何开发游戏

《一文详解Python如何开发游戏》Python是一种非常流行的编程语言,也可以用来开发游戏模组,:本文主要介绍Python如何开发游戏的相关资料,文中通过代码介绍的非常详细,需要的朋友可以参考下... 目录一、python简介二、Python 开发 2D 游戏的优劣势优势缺点三、Python 开发 3D

Python函数作用域与闭包举例深度解析

《Python函数作用域与闭包举例深度解析》Python函数的作用域规则和闭包是编程中的关键概念,它们决定了变量的访问和生命周期,:本文主要介绍Python函数作用域与闭包的相关资料,文中通过代码... 目录1. 基础作用域访问示例1:访问全局变量示例2:访问外层函数变量2. 闭包基础示例3:简单闭包示例4