FunDA(2)- Streaming Data Operation:流式数据操作

2024-04-09 04:58

本文主要是介绍FunDA(2)- Streaming Data Operation:流式数据操作,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

   在上一集的讨论里我们介绍并实现了强类型返回结果行。使用强类型主要的目的是当我们把后端数据库SQL批次操作搬到内存里转变成数据流式按行操作时能更方便、准确、高效地选定数据字段。在上集讨论示范里我们用集合的foreach方式模拟了一个最简单的数据流,并把从数据库里批次读取的数据集转换成一串连续的数据行来逐行使用。一般来说完整的流式数据处理流程包括了从数据库中读取数据、根据读取的每行数据状态再对后台数据库进行更新,包括:插入新数据、更新、删除等。那么在上篇中实现的流式操作基础上再添加一种指令行类型就可以完善整个数据处理流程了,就像下面这个图示:

Database => Query -> Collection => Streaming -> DataRow => QueryAction(DataRow) -> ActionRow => execAction(ActionRow) -> Database

如果我们还是以Slick为目标FRM,那么这个ActionRow的类型就是Slick的DBIO[T]了:

package com.bayakala.funda.rowtypes
import slick.dbio._
object ActionType {type FDAAction[T] = DBIO[T]
}


记得有一次在一个Scala讨论区里遇到这样一个问题:如何把a表里的status字段更新成b表的status字段值,转化成SQL语句如下:

update a,b set a.status=b.status where a.id=b.id


那位哥们的问题是如何用Slick来实现对a表的更新,不能用sql"???" interpolation 直接调用SQL语句,可能因为要求compile time语法check保障吧。这个问题用Slick Query还真的不太容易解决(能不能解决就不想费功夫去想了),这是因为FRM的SQL批次处理弱点。如果用FunDA的流式操作思路就会很容易解决了,只要用join Query把b.status读出来再用b.id=a.id逐个更新a.status。刚好,下面我们就示范通过ActionRow来解决这个问题。先用下面这段代码来设置测试数据:

import slick.dbio.DBIO
import slick.driver.H2Driver.api._import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
import scala.util.{Failure, Success}
import scala.concurrent.ExecutionContext.Implicits.global
import slick.jdbc.meta.MTable
object ActionRowTest extends App {class ATable(tag: Tag) extends Table[(Int,String,Int)](tag,"TA")  {def id = column[Int]("id",O.PrimaryKey)def flds = column[String]("aflds")def status = column[Int]("status")def * = (id,flds,status)}val tableA = TableQuery[ATable]class BTable(tag: Tag) extends Table[(Int,String,Int)](tag,"TB")  {def id = column[Int]("id",O.PrimaryKey)def flds = column[String]("bflds")def status = column[Int]("status")def * = (id,flds,status)}val tableB = TableQuery[BTable]val insertAAction =tableA ++= Seq ((1,"aaa",0),(2,"bbb",3),(3,"ccc",1),(4,"ddd",0),(16,"kkk",16))val insertBAction =tableB ++= Seq ((1,"aaa",1),(2,"bbb",2),(3,"ccc",3),(4,"ddd",4),(5,"kkk",5))val db = Database.forConfig("h2db")def tableExists(tables: Vector[MTable], tblname: String) =tables.exists {t =>t.name.toString.contains(tblname)}def createSchemaIfNotExists(): Future[Unit] = {db.run(MTable.getTables).flatMap {case tables if !tableExists(tables,".TA") && !tableExists(tables,".TB") =>println("Creating schemas for TA and TB...")db.run((tableA.schema ++ tableB.schema).create)case tables if !tableExists(tables,".TA") =>println("Creating schema for TA ...")db.run(tableA.schema.create)case tables if !tableExists(tables,".TB") =>println("Creating schema for TB ...")db.run(tableB.schema.create)case _ =>println("Schema for TA, TB already created.")Future.successful()}}def insertInitialData(): Future[Unit] = {val cleanInsert = DBIO.seq(tableA.delete, tableB.delete,insertAAction,insertBAction)db.run(cleanInsert).andThen {case Success(_) => println("Data insert completed.")case Failure(e) => println(s"Data insert failed [${e.getMessage}]")}}Await.ready(db.run(sql"DROP TABLE TA; DROP TABLE TB".as[String]),Duration.Inf)val initResult = createSchemaIfNotExists().flatMap {_ => insertInitialData()}Await.ready(initResult,Duration.Inf)}

用join query先把这两个表相关的字段值搬到内存转成强类型行FDADataRow:

 val selectAB = for {a <- tableAb <- tableBif (a.id === b.id)} yield (a.id,b.id,a.status,b.status)case class ABRow (id: Int, asts: Int, bsts: Int)def toABRow(raw: (Int,Int,Int,Int)) = ABRow(raw._1,raw._3,raw._4)import com.bayakala.funda.rowtypes.DataRowTypeval loader = FDADataRow(slick.driver.H2Driver, toABRow _)loader.getTypedRows(selectAB.result)(db).foreach {dataRow =>println(s"ID:${dataRow.id} Status A = ${dataRow.asts}, B = ${dataRow.bsts}")}


初始结果如下:

ID:1 Status A = 0, B = 1
ID:2 Status A = 3, B = 2
ID:3 Status A = 1, B = 3
ID:4 Status A = 0, B = 4


现在我们把每条数据行DataRow转成动作行ActionRow。然后把每条DataRow的asts字段值替换成bsts的字段值:

import com.bayakala.funda.rowtypes.ActionType.FDAActiondef updateAStatus(row: ABRow): FDAAction[Int] = {tableA.filter{r => r.id === row.id}.map(_.status).update(row.asts)}loader.getTypedRows(selectAB.result)(db).map(updateAStatus(_)).foreach {actionRow =>println(s"${actionRow.toString}")}


显示结果如下:

slick.driver.JdbcActionComponent$UpdateActionExtensionMethodsImpl$$anon$7@492691d7
slick.driver.JdbcActionComponent$UpdateActionExtensionMethodsImpl$$anon$7@27216cd
slick.driver.JdbcActionComponent$UpdateActionExtensionMethodsImpl$$anon$7@558bdf1f
slick.driver.JdbcActionComponent$UpdateActionExtensionMethodsImpl$$anon$7@8576fa0

现在每条DataRow已经被转化成jdbc action类型了。

下一步我们只需要运行这些ActionRow就可以完成任务了:

  def execAction(act: FDAAction[Int]) = db.run(act)loader.getTypedRows(selectAB.result)(db).map(updateAStatus(_)).map(execAction(_))


现在再看看数据库中的TA表状态:

  loader.getTypedRows(selectAB.result)(db).foreach {dataRow =>println(s"ID:${dataRow.id} Status A = ${dataRow.asts}, B = ${dataRow.bsts}")}结果:
ID:1 Status A = 1, B = 1
ID:2 Status A = 2, B = 2
ID:3 Status A = 3, B = 3
ID:4 Status A = 4, B = 4


我们看到已经正确更新了TA的status字段值。

在这个示范中明显有很多不足之处:如果a.status=b.status应该省略更新步骤。这是因为foreach只能模拟最基本的数据流动。如果我们使用了具备强大功能的Stream工具库如scalaz-stream-fs2,就可以更好控制数据元素的流动。更重要的是scalaz-stream-fs2支持并行运算,那么上面所描述的流程:

Database => Query -> Collection => Streaming -> DataRow => QueryAction(DataRow) -> ActionRow => execAction(ActionRow) -> Database


几个 => 环节:Query、Streaming、QueryAction、execAction将可以并行运算,从而实现充分利用多核CPU硬件资源,提高运算效率的目的。

下面是这次讨论涉及的源代码:

package com.bayakala.funda.rowtypesimport scala.concurrent.duration._
import scala.concurrent.Await
import slick.driver.JdbcProfileobject DataRowType {class FDADataRow[SOURCE, TARGET](slickProfile: JdbcProfile,convert: SOURCE  => TARGET){import slickProfile.api._def getTypedRows(slickAction: DBIO[Iterable[SOURCE]])(slickDB: Database): Iterable[TARGET] =Await.result(slickDB.run(slickAction), Duration.Inf).map(raw => convert(raw))}object FDADataRow {def apply[SOURCE, TARGET](slickProfile: JdbcProfile, converter: SOURCE => TARGET): FDADataRow[SOURCE, TARGET] =new FDADataRow[SOURCE, TARGET](slickProfile, converter)}}

package com.bayakala.funda.rowtypes
import slick.dbio._
object ActionType {type FDAAction[T] = DBIO[T]
}

import slick.dbio.DBIO
import slick.driver.H2Driver.api._import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
import scala.util.{Failure, Success}
import scala.concurrent.ExecutionContext.Implicits.global
import slick.jdbc.meta.MTable
object ActionRowTest extends App {class ATable(tag: Tag) extends Table[(Int,String,Int)](tag,"TA")  {def id = column[Int]("id",O.PrimaryKey)def flds = column[String]("aflds")def status = column[Int]("status")def * = (id,flds,status)}val tableA = TableQuery[ATable]class BTable(tag: Tag) extends Table[(Int,String,Int)](tag,"TB")  {def id = column[Int]("id",O.PrimaryKey)def flds = column[String]("bflds")def status = column[Int]("status")def * = (id,flds,status)}val tableB = TableQuery[BTable]val insertAAction =tableA ++= Seq ((1,"aaa",0),(2,"bbb",3),(3,"ccc",1),(4,"ddd",0),(16,"kkk",16))val insertBAction =tableB ++= Seq ((1,"aaa",1),(2,"bbb",2),(3,"ccc",3),(4,"ddd",4),(5,"kkk",5))val db = Database.forConfig("h2db")def tableExists(tables: Vector[MTable], tblname: String) =tables.exists {t =>t.name.toString.contains(tblname)}def createSchemaIfNotExists(): Future[Unit] = {db.run(MTable.getTables).flatMap {case tables if !tableExists(tables,".TA") && !tableExists(tables,".TB") =>println("Creating schemas for TA and TB...")db.run((tableA.schema ++ tableB.schema).create)case tables if !tableExists(tables,".TA") =>println("Creating schema for TA ...")db.run(tableA.schema.create)case tables if !tableExists(tables,".TB") =>println("Creating schema for TB ...")db.run(tableB.schema.create)case _ =>println("Schema for TA, TB already created.")Future.successful()}}def insertInitialData(): Future[Unit] = {val cleanInsert = DBIO.seq(tableA.delete, tableB.delete,insertAAction,insertBAction)db.run(cleanInsert).andThen {case Success(_) => println("Data insert completed.")case Failure(e) => println(s"Data insert failed [${e.getMessage}]")}}Await.ready(db.run(sql"DROP TABLE TA; DROP TABLE TB".as[String]),Duration.Inf)val initResult = createSchemaIfNotExists().flatMap {_ => insertInitialData()}Await.ready(initResult,Duration.Inf)val selectAB = for {a <- tableAb <- tableBif (a.id === b.id)} yield (a.id,b.id,a.status,b.status)case class ABRow (id: Int, asts: Int, bsts: Int)def toABRow(raw: (Int,Int,Int,Int)) = ABRow(raw._1,raw._3,raw._4)import com.bayakala.funda.rowtypes.DataRowType.FDADataRowval loader = FDADataRow(slick.driver.H2Driver, toABRow _)loader.getTypedRows(selectAB.result)(db).foreach {dataRow =>println(s"ID:${dataRow.id} Status A = ${dataRow.asts}, B = ${dataRow.bsts}")}import com.bayakala.funda.rowtypes.ActionType.FDAActiondef updateAStatus(row: ABRow): FDAAction[Int] = {tableA.filter{r => r.id === row.id}.map(_.status).update(row.bsts)}loader.getTypedRows(selectAB.result)(db).map(updateAStatus(_)).foreach {actionRow =>println(s"${actionRow.toString}")}def execAction(act: FDAAction[Int]) = db.run(act)loader.getTypedRows(selectAB.result)(db).map(updateAStatus(_)).map(execAction(_))loader.getTypedRows(selectAB.result)(db).foreach {dataRow =>println(s"ID:${dataRow.id} Status A = ${dataRow.asts}, B = ${dataRow.bsts}")}}



 




这篇关于FunDA(2)- Streaming Data Operation:流式数据操作的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/887231

相关文章

Python使用vllm处理多模态数据的预处理技巧

《Python使用vllm处理多模态数据的预处理技巧》本文深入探讨了在Python环境下使用vLLM处理多模态数据的预处理技巧,我们将从基础概念出发,详细讲解文本、图像、音频等多模态数据的预处理方法,... 目录1. 背景介绍1.1 目的和范围1.2 预期读者1.3 文档结构概述1.4 术语表1.4.1 核

Java操作Word文档的全面指南

《Java操作Word文档的全面指南》在Java开发中,操作Word文档是常见的业务需求,广泛应用于合同生成、报表输出、通知发布、法律文书生成、病历模板填写等场景,本文将全面介绍Java操作Word文... 目录简介段落页头与页脚页码表格图片批注文本框目录图表简介Word编程最重要的类是org.apach

MySQL 删除数据详解(最新整理)

《MySQL删除数据详解(最新整理)》:本文主要介绍MySQL删除数据的相关知识,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录一、前言二、mysql 中的三种删除方式1.DELETE语句✅ 基本语法: 示例:2.TRUNCATE语句✅ 基本语

Python实现对阿里云OSS对象存储的操作详解

《Python实现对阿里云OSS对象存储的操作详解》这篇文章主要为大家详细介绍了Python实现对阿里云OSS对象存储的操作相关知识,包括连接,上传,下载,列举等功能,感兴趣的小伙伴可以了解下... 目录一、直接使用代码二、详细使用1. 环境准备2. 初始化配置3. bucket配置创建4. 文件上传到os

mysql表操作与查询功能详解

《mysql表操作与查询功能详解》本文系统讲解MySQL表操作与查询,涵盖创建、修改、复制表语法,基本查询结构及WHERE、GROUPBY等子句,本文结合实例代码给大家介绍的非常详细,感兴趣的朋友跟随... 目录01.表的操作1.1表操作概览1.2创建表1.3修改表1.4复制表02.基本查询操作2.1 SE

MyBatisPlus如何优化千万级数据的CRUD

《MyBatisPlus如何优化千万级数据的CRUD》最近负责的一个项目,数据库表量级破千万,每次执行CRUD都像走钢丝,稍有不慎就引起数据库报警,本文就结合这个项目的实战经验,聊聊MyBatisPl... 目录背景一、MyBATis Plus 简介二、千万级数据的挑战三、优化 CRUD 的关键策略1. 查

python实现对数据公钥加密与私钥解密

《python实现对数据公钥加密与私钥解密》这篇文章主要为大家详细介绍了如何使用python实现对数据公钥加密与私钥解密,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录公钥私钥的生成使用公钥加密使用私钥解密公钥私钥的生成这一部分,使用python生成公钥与私钥,然后保存在两个文

mysql中的数据目录用法及说明

《mysql中的数据目录用法及说明》:本文主要介绍mysql中的数据目录用法及说明,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录1、背景2、版本3、数据目录4、总结1、背景安装mysql之后,在安装目录下会有一个data目录,我们创建的数据库、创建的表、插入的

c++中的set容器介绍及操作大全

《c++中的set容器介绍及操作大全》:本文主要介绍c++中的set容器介绍及操作大全,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录​​一、核心特性​​️ ​​二、基本操作​​​​1. 初始化与赋值​​​​2. 增删查操作​​​​3. 遍历方

MySQL追踪数据库表更新操作来源的全面指南

《MySQL追踪数据库表更新操作来源的全面指南》本文将以一个具体问题为例,如何监测哪个IP来源对数据库表statistics_test进行了UPDATE操作,文内探讨了多种方法,并提供了详细的代码... 目录引言1. 为什么需要监控数据库更新操作2. 方法1:启用数据库审计日志(1)mysql/mariad