haskell并发编程

2024-02-29 06:58
文章标签 并发 编程 haskell

本文主要是介绍haskell并发编程,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

线程相关


forkIO :: IO a -> IO ThreadId

forkIO是Haskell内建的函数,它的参数是一个IO动作,forkIO所做的事情就是创建一个并发的Haskell线程 来执行这个IO动作。一旦这个新线程建立,Haskell的运行时系统便会将它与其他Haskell线程并行执行。

Haskell里面由forkIO产生出来的线程是非常轻量级的:只占用几百个字节的内存,所以一个程序里面就算 产生上千个线程也是完全正常的 。(注:ForkIO creates a spark not OS thread and then the scheduler decides when sparks should be scheduled to threads. Thus you get a guarantee of concurrent but not parallel execution.)

例子:

module Main where

import Control.Concurrent.MVar
import Control.Concurrent
import Data.Maybe

timeout :: Int -> IO a -> IO (Maybe a)
timeout time action = do
      someMVar <- newEmptyMVar -- MVar is a Maybe
      timeoutThread <- forkIO $ nothingIzer time someMVar
      forkIO $ actionRunner action someMVar timeoutThread
      takeMVar someMVar >>= return
      where 
          nothingIzer time mvar = threadDelay time >> putMVar mvar Nothing
          actionRunner action mvar timeoutThread =do
              res <- action
              killThread timeoutThread
              putMVar mvar $ Just res

main :: IO ()
main = do
     res <- timeout (5 * 10 ^ 6) (getLine >>= putStrLn)
     case resof
         Nothing -> putStrLn "Timeout"
         Just x -> putStrLn "Success"

STM相关

传统的并发模型通过Mutex/Conditional Variable/Semaphore的设施来控制对共享资源的访问控制,但是这一经典模型使得编写正确高效的并发程序变得异常困难:

  .遗漏合适的锁保护导致的race condition
  .锁使用不当导致的死锁deadlock
  .异常未合适处理导致的程序崩溃
  .条件变量通知操作遗漏导致的等待处理没有被合适的唤醒
  .锁粒度控制不当造成性能下降

STM的基本设计规则如下:
   .对共享资源的访问进行控制从而使不同线程的操作相互隔离
   .规则约束:
      如果没有其它线程访问共享数据,那么当前线程对数据的修改同时对其它线程可见
      反之,当前线程的操作将被完全丢弃并自动重启

这里的要么全做要么什么也不做的方式保证了共享数据访问操作的原子性,和数据库中的Transaction很相像。


STM的主要优点之一是可组合性(composability)与模块性(modularity)。不同的fragments可能组合成较大components,而基于锁的系统就很难做到。

----------------------------------------------------------------------------------------------

transfer :: Account -> Account -> Int -> IO ( )

-- Transfer 'amount' from account 'from' to account 'to'
transfer from to amount = atomically (
      do { deposit to amount; withdraw from amount }
)

atomically :: STM a -> IO a

atomically的参数是一个类型为STM a的动作。STM动作类似于IO动作,它们都可能具有副作用,但STM动作的副作用的容许范围要小得多。 STM中你可以做的事情主要就是对事务变量(类型为TVar a)进行读写,就像我们在IO动作里面主要对IORef进行读写一样

atomically用伪代码描述如下:

  atomically action = do

    varState <- getStateOfTVars
    (newState, ret) <- runTransactionWith action varState
    success <- attemptToCommitChangesToTVars newState
    if success
    then return ret
    else atomically action -- try again


readTVar  :: TVar a -> STM a
writeTVar :: TVar a -> a -> STM ( )

跟IO动作一样,STM动作也可以由do块组合起来,实际上,do块针对STM动作进行了重载,return也是;这样它们便可以运用于STM和IO两种动作了。其实Haskell并没有特别针对IO和STM动作来重载do和return,IO和STM其实只是一个更一般的模式的特例,这个更一般的模式便是所谓的monad,do和return的重载便是通过用Haskell的非常泛化的"类型的类型"(type-class)系统来表达monad而得以实现的。

type Account =TVar Int
withdraw :: Account -> Int -> STM ( )
withdraw acc amount = do {
     bal <- readTVar acc; writeTVar acc (bal - amount)
}

我们用一个包含一个Int(账户余额)的事务变量来表示一个账户。withdraw是一个STM动作,将账户中的余额提走amount。

为了完成transfer的定义,我们可以通过withdraw来定义deposit:

deposit :: Account -> Int -> STM ( )
deposit acc amount = withdraw acc (- amount)

Haskell的类型系统优雅地阻止了我们在事务之外读写TVar


例如假设我们这样写:
bad :: Account -> IO ( )
bad acc = dohPutStr  stdout  "Withdrawing..."; withdraw acc 10 }

以上代码不能通过编译,因为hPutStr是一个IO动作,而withdraw则是一个STM动作,这两者不能放在同一个do块中。但如果我们把withdraw再放在一个atomically调用当中就可以了(atomically返回IO动作):

good :: Account -> IO ( )

good acc = dohPutStr stdout "Withdrawing..."; atomically (withdraw acc 10) }



操作

类型签名

atomically

STM a -> IO a

retry

STM a

orElse

STM a -> STM a -> STM a

newTVar

a -> STM (TVar a)

readTVar

TVar a -> STM a

writeTVar

TVar a -> a -> STM ( )


实例一 Santa.hs(来自Grey Wils的《Beautiful Code》)

module Main where

import Control.Concurrent.STM
import Control.Concurrent
import System.Random

main = do {
    elf_gp <- newGroup 3;
    sequence_ [ elf elf_gp n | n <- [1..10]];

    rein_gp <- newGroup 9;
    sequence_ [ reindeer rein_gp n | n <- [1..9]] ;

    forever (santa elf_gp rein_gp) }
  where
    elf gp id = forkIO (forever (do { elf1 gp id; randomDelay }))
    reindeer gp id = forkIO (forever (do { reindeer1 gp id; randomDelay }))


--圣诞老人是这个问题里面最有趣的,因为他会进行选择。他必须等到一组驯鹿或一组小矮人在那儿等他的时候才会继续行动。一旦他选择了是带领驯鹿还是小矮人之后,他便将他们带去做该做的事。

--santa利用awaitGroup来等待一个群准备好;choose拿到awaitGroup返回的两扇门之后便将它们传给run函数,后者依次操纵这两扇门--operatorGate会一直阻塞,直到所有小矮人(或驯鹿)都穿过门之后才会返回。
santa :: Group -> Group -> IO ()
santa elf_group rein_group = do {
    putStr "----------\n" ;
    choose [(awaitGroup rein_group, run "deliver toys"),
            (awaitGroup elf_group, run "meet in my study")] }
  where
    run :: String -> (Gate,Gate) -> IO ()
    run task (in_gate,out_gate) = do {
        putStr ("Ho! Ho! Ho! let's " ++ task ++ "\n") ;
        operateGate in_gate;
        operateGate out_gate }

helper1 :: Group -> IO () -> IO ()
helper1 group do_task = do {
    (in_gate, out_gate) <- joinGroup group;
    passGate in_gate;
    do_task;
    passGate out_gate }

elf1, reindeer1 :: Group -> Int -> IO ()
elf1 group id = helper1 group (meetInStudy id)
reindeer1 group id = helper1 group (deliverToys id)

meetInStudy id = putStr ("Elf " ++ show id ++ " meeting in the study\n")
deliverToys id = putStr ("Reindeer " ++ show id ++ " delivering toys\n")

---------------
data Group = MkGroup Int (TVar (Int, Gate, Gate))

newGroup :: Int -> IO Group
newGroup n = atomically (
    do {
        g1 <- newGate n;
        g2 <- newGate n;
        tv <- newTVar (n, g1, g2);
        return (MkGroup n tv) })

joinGroup :: Group -> IO (Gate,Gate)
joinGroup (MkGroup n tv) = atomically (
    do {
        (n_left, g1, g2) <- readTVar tv;
        check (n_left > 0);
        writeTVar tv (n_left-1, g1, g2);
        return (g1,g2) })

awaitGroup :: Group -> STM (Gate,Gate)
awaitGroup (MkGroup n tv) = do {
    (n_left, g1, g2) <- readTVar tv;
    check (n_left == 0);
    new_g1 <- newGate n;
    new_g2 <- newGate n;
    writeTVar tv (n,new_g1,new_g2);
    return (g1,g2) }

---------------
data Gate = MkGate Int (TVar Int)

newGate :: Int -> STM Gate
newGate n = do { tv <- newTVar 0; return (MkGate n tv) }

passGate :: Gate -> IO ()
passGate (MkGate n tv) = atomically (
    do { n_left <- readTVar tv;
         check (n_left > 0);
         writeTVar tv (n_left-1) })

operateGate :: Gate -> IO ()
operateGate (MkGate n tv) = do {
    atomically (
        writeTVar tv n);
        atomically (
            do { n_left <- readTVar tv; check (n_left == 0) }) }

----------------

forever :: IO () -> IO ()
-- Repeatedly perform the action
forever act = do { act; forever act }

randomDelay :: IO ()
-- Delay for a random time between 1 and 1000,000 microseconds
randomDelay = do {

    waitTime <- getStdRandom (randomR (1, 1000000));
    threadDelay waitTime }


--foldr1 orElse [x1, … , xn]的结果是x1 orElse x2 orElse x3 … orElse xn)

--choose首先在各个动作之间作一次原子选择,取得返回出来的动作(act,类型为IO( )),然后执行该动作。

choose :: [(STM a, a -> IO ())] -> IO ()
choose choices = do {
    to_do <- atomically (foldr1 orElse stm_actions);
    to_do }
where
    stm_actions :: [STM (IO ())]
    stm_actions = [ do { val <- guard; return (rhs val) } | (guard, rhs) <- choices ]

If there is no stm installed, you need to run `cabal install stm` before build it.

$ ghc Santa.hs -package stm -o  santa


实例二

module DirectAddressTable 
( DAT
, newDAT
, lookupDAT
, insertDAT
, getAssocsDAT
)
where
import Data.Array.IO
import Data.Array.MArray

newtype DAT = DAT (IOArray Int Char)

-- create a fixed size array; missing keys have value '-'.
newDAT :: Int -> IO DAT
newDAT n = do a <-newArray (0, n - 1) '-'
              return (DAT a)

-- lookup an item.
lookupDAT :: DAT -> Int -> IO (Maybe Char)
lookupDAT (DAT a) i = do   

    c <- readArray a i 
 return (if c=='-'then Nothing else Just c)

-- insert an item
insertDAT :: DAT -> Int -> Char -> IO ()
insertDAT (DAT a) i v = writeArray a i v

-- get all associations (exclude missing items, i.e. those whose value is '-').
getAssocsDAT :: DAT -> IO [(Int,Char)]
getAssocsDAT (DAT a) = do
    assocs <- getAssocs a
    return [ (k,c) | (k,c) <- assocs, c /= '-' ]

   I then have a main program that initializes a new table, forks some threads, with each thread writing and reading some fixed number of values to the just initialized table. The overall number of elements to write is fixed. The number of threads to use is a taken from a command line argument, and the elements to process are evenly divided among the threads.


-- file DirectTableTest.hs
import DirectAddressTable
import Control.Concurrent
import Control.Parallel
import System.Environment

main =  do
   args <- getArgs
   let numThreads =read (args !! 0)
   vs <- sequence (replicate numThreads newEmptyMVar)
   a <- newDAT arraySize  
   sequence_ [forkIO (doLotsOfStuff numThreads i a >>= putMVar v) | (i,v) <-zip [1..] vs]
   sequence_ [takeMVar v >>= \a -> getAssocsDAT a >>= \xs -> print (last xs)  | v <- vs]

doLotsOfStuff :: Int -> Int -> DAT -> IO DAT
doLotsOfStuff numThreads i a =  do
   let p j c = (c `seq` insertDAT a j c) >> lookupDAT a j >>= \v -> v `pseq` return ()
   sequence_ [ p j c | (j,c) <- bunchOfKeys i ]
   return a
   where bunchOfKeys i =take numElems $zip cyclicIndices $drop i cyclicChars
         numElems = numberOfElems `div` numThreads

cyclicIndices = cycle [0..highestIndex]
cyclicChars = cycle chars
chars = ['a'..'z']

-- Parameters
arraySize :: Int
arraySize = 100
highestIndex = arraySize - 1
numberOfElems = 10 * 1000 * 1000

   编译 & 运行

  >ghc --make -rtsopts -threaded -fforce-recomp -O2 DirectTableTest.hs

  >time ./DirectTableTest 1 +RTS -N1

  >time ./DirectTableTest 2 +RTS -N2



这篇关于haskell并发编程的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/757925

相关文章

MySQL的JDBC编程详解

《MySQL的JDBC编程详解》:本文主要介绍MySQL的JDBC编程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录前言一、前置知识1. 引入依赖2. 认识 url二、JDBC 操作流程1. JDBC 的写操作2. JDBC 的读操作总结前言本文介绍了mysq

Web服务器-Nginx-高并发问题

《Web服务器-Nginx-高并发问题》Nginx通过事件驱动、I/O多路复用和异步非阻塞技术高效处理高并发,结合动静分离和限流策略,提升性能与稳定性... 目录前言一、架构1. 原生多进程架构2. 事件驱动模型3. IO多路复用4. 异步非阻塞 I/O5. Nginx高并发配置实战二、动静分离1. 职责2

Python异步编程之await与asyncio基本用法详解

《Python异步编程之await与asyncio基本用法详解》在Python中,await和asyncio是异步编程的核心工具,用于高效处理I/O密集型任务(如网络请求、文件读写、数据库操作等),接... 目录一、核心概念二、使用场景三、基本用法1. 定义协程2. 运行协程3. 并发执行多个任务四、关键

AOP编程的基本概念与idea编辑器的配合体验过程

《AOP编程的基本概念与idea编辑器的配合体验过程》文章简要介绍了AOP基础概念,包括Before/Around通知、PointCut切入点、Advice通知体、JoinPoint连接点等,说明它们... 目录BeforeAroundAdvise — 通知PointCut — 切入点Acpect — 切面

Spring Security 前后端分离场景下的会话并发管理

《SpringSecurity前后端分离场景下的会话并发管理》本文介绍了在前后端分离架构下实现SpringSecurity会话并发管理的问题,传统Web开发中只需简单配置sessionManage... 目录背景分析传统 web 开发中的 sessionManagement 入口ConcurrentSess

C#异步编程ConfigureAwait的使用小结

《C#异步编程ConfigureAwait的使用小结》本文介绍了异步编程在GUI和服务器端应用的优势,详细的介绍了async和await的关键作用,通过实例解析了在UI线程正确使用await.Conf... 异步编程是并发的一种形式,它有两大好处:对于面向终端用户的GUI程序,提高了响应能力对于服务器端应

MySQL中处理数据的并发一致性的实现示例

《MySQL中处理数据的并发一致性的实现示例》在MySQL中处理数据的并发一致性是确保多个用户或应用程序同时访问和修改数据库时,不会导致数据冲突、数据丢失或数据不一致,MySQL通过事务和锁机制来管理... 目录一、事务(Transactions)1. 事务控制语句二、锁(Locks)1. 锁类型2. 锁粒

深入解析Java NIO在高并发场景下的性能优化实践指南

《深入解析JavaNIO在高并发场景下的性能优化实践指南》随着互联网业务不断演进,对高并发、低延时网络服务的需求日益增长,本文将深入解析JavaNIO在高并发场景下的性能优化方法,希望对大家有所帮助... 目录简介一、技术背景与应用场景二、核心原理深入分析2.1 Selector多路复用2.2 Buffer

C# async await 异步编程实现机制详解

《C#asyncawait异步编程实现机制详解》async/await是C#5.0引入的语法糖,它基于**状态机(StateMachine)**模式实现,将异步方法转换为编译器生成的状态机类,本... 目录一、async/await 异步编程实现机制1.1 核心概念1.2 编译器转换过程1.3 关键组件解析

go动态限制并发数量的实现示例

《go动态限制并发数量的实现示例》本文主要介绍了Go并发控制方法,通过带缓冲通道和第三方库实现并发数量限制,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面... 目录带有缓冲大小的通道使用第三方库其他控制并发的方法因为go从语言层面支持并发,所以面试百分百会问到