restapi(8)- restapi-sql:用户自主的服务

2024-04-09 04:38

本文主要是介绍restapi(8)- restapi-sql:用户自主的服务,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

  学习函数式编程初衷是看到自己熟悉的oop编程语言和sql数据库在现代商业社会中前景暗淡,准备完全放弃windows技术栈转到分布式大数据技术领域的,但是在现实中理想总是不如人意的。本来想在一个规模较小的公司展展拳脚,以为小公司会少点历史包袱,有利于全面技术改造。但现实是:即使是小公司,一旦有个成熟的产品,那么进行全面的技术更新基本上是不可能的了,因为公司要生存,开发人员很难新旧技术之间随时切换。除非有狂热的热情,员工怠慢甚至抵制情绪不容易解决。只能采取逐步切换方式:保留原有产品的后期维护不动,新产品开发用一些新的技术。在我们这里的情况就是:以前一堆c#、sqlserver的东西必须保留,新的功能比如大数据、ai、识别等必须用新的手段如scala、python、dart、akka、kafka、cassandra、mongodb来开发。好了,新旧两个开发平台之间的软件系统对接又变成了一个问题。

   现在我们这里又个需求:把在linux-ubuntu akka-cluster集群环境里mongodb里数据处理的结果传给windows server下SQLServer里。这是一种典型的异系统集成场景。我的解决方案是通过一个restapi服务作为两个系统的数据桥梁,这个restapi的最基本要求是:

1、支持任何操作系统前端:这个没什么问题,在http层上通过json交换数据

2、能读写mongodb:在前面讨论的restapi-mongo已经实现了这一功能

3、能读写windows server环境下的sqlserver:这个是本篇讨论的主题

前面曾经实现了一个jdbc-engine项目,基于scalikejdbc,不过只示范了slick-h2相关的功能。现在需要sqlserver-jdbc驱动,然后试试能不能在JVM里驱动windows下的sqlserver。maven里找不到sqlserver的驱动,但从微软官网可以下载mssql-jdbc-7.0.0.jre8.jar。这是个jar,在sbt里称作unmanagedjar,不能摆在build.sbt的dependency里。这个需要摆在项目根目录下的lib目录下即可(也可以在放在build.sbt里unmanagedBase :=?? 指定的路径下)。然后是数据库连接,下面是可以使用sqlserver的application.conf配置文件内容:

# JDBC settings
prod {db {h2 {driver = "org.h2.Driver"url = "jdbc:h2:tcp://localhost/~/slickdemo"user = ""password = ""poolFactoryName = "hikaricp"numThreads = 10maxConnections = 12minConnections = 4keepAliveConnection = true}mysql {driver = "com.mysql.cj.jdbc.Driver"url = "jdbc:mysql://localhost:3306/testdb"user = "root"password = "123"poolFactoryName = "hikaricp"numThreads = 10maxConnections = 12minConnections = 4keepAliveConnection = true}postgres {driver = "org.postgresql.Driver"url = "jdbc:postgresql://localhost:5432/testdb"user = "root"password = "123"poolFactoryName = "hikaricp"numThreads = 10maxConnections = 12minConnections = 4keepAliveConnection = true}mssql {driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"url = "jdbc:sqlserver://192.168.11.164:1433;integratedSecurity=false;Connect Timeout=3000"user = "sa"password = "Tiger2020"poolFactoryName = "hikaricp"numThreads = 10maxConnections = 12minConnections = 4keepAliveConnection = trueconnectionTimeout = 3000}termtxns {driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"url = "jdbc:sqlserver://192.168.11.164:1433;DATABASE=TERMTXNS;integratedSecurity=false;Connect Timeout=3000"user = "sa"password = "Tiger2020"poolFactoryName = "hikaricp"numThreads = 10maxConnections = 12minConnections = 4keepAliveConnection = trueconnectionTimeout = 3000}crmdb {driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"url = "jdbc:sqlserver://192.168.11.164:1433;DATABASE=CRMDB;integratedSecurity=false;Connect Timeout=3000"user = "sa"password = "Tiger2020"poolFactoryName = "hikaricp"numThreads = 10maxConnections = 12minConnections = 4keepAliveConnection = trueconnectionTimeout = 3000}}# scallikejdbc Global settingsscalikejdbc.global.loggingSQLAndTime.enabled = truescalikejdbc.global.loggingSQLAndTime.logLevel = infoscalikejdbc.global.loggingSQLAndTime.warningEnabled = truescalikejdbc.global.loggingSQLAndTime.warningThresholdMillis = 1000scalikejdbc.global.loggingSQLAndTime.warningLogLevel = warnscalikejdbc.global.loggingSQLAndTime.singleLineMode = falsescalikejdbc.global.loggingSQLAndTime.printUnprocessedStackTrace = falsescalikejdbc.global.loggingSQLAndTime.stackTraceDepth = 10
}

这个文件里的mssql,termtxns,crmdb段落都是给sqlserver的,它们都使用hikaricp线程池管理。

在jdbc-engine里启动数据库方式如下:

  ConfigDBsWithEnv("prod").setup('termtxns)ConfigDBsWithEnv("prod").setup('crmdb)ConfigDBsWithEnv("prod").loadGlobalSettings()

这段打开了在配置文件中用termtxns,crmdb注明的数据库。

下面是SqlHttpServer.scala的代码:

package com.datatech.rest.sql
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives._
import pdi.jwt._
import AuthBase._
import MockUserAuthService._
import com.datatech.sdp.jdbc.config.ConfigDBsWithEnvimport akka.actor.ActorSystem
import akka.stream.ActorMaterializerimport Repo._
import SqlRoute._object SqlHttpServer extends App {implicit val httpSys = ActorSystem("sql-http-sys")implicit val httpMat = ActorMaterializer()implicit val httpEC = httpSys.dispatcherConfigDBsWithEnv("prod").setup('termtxns)ConfigDBsWithEnv("prod").setup('crmdb)ConfigDBsWithEnv("prod").loadGlobalSettings()implicit val authenticator = new AuthBase().withAlgorithm(JwtAlgorithm.HS256).withSecretKey("OpenSesame").withUserFunc(getValidUser)val route =path("auth") {authenticateBasic(realm = "auth", authenticator.getUserInfo) { userinfo =>post { complete(authenticator.issueJwt(userinfo))}}} ~pathPrefix("api") {authenticateOAuth2(realm = "api", authenticator.authenticateToken) { token =>new SqlRoute("sql", token)(new JDBCRepo).route// ~ ...}}val (port, host) = (50081,"192.168.11.189")val bindingFuture = Http().bindAndHandle(route,host,port)println(s"Server running at $host $port. Press any key to exit ...")scala.io.StdIn.readLine()bindingFuture.flatMap(_.unbind()).onComplete(_ => httpSys.terminate())}

服务入口在http://mydemo.com/api/sql,服务包括get,post,put三类,看看这个SqlRoute:

package com.datatech.rest.sql
import akka.http.scaladsl.server.Directives
import akka.stream.ActorMaterializer
import akka.http.scaladsl.model._
import akka.actor.ActorSystem
import com.datatech.rest.sql.Repo.JDBCRepo
import akka.http.scaladsl.common._
import spray.json.DefaultJsonProtocol
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupporttrait JsFormats extends SprayJsonSupport with DefaultJsonProtocol
object JsConverters extends JsFormats {import SqlModels._implicit val brandFormat = jsonFormat2(Brand)implicit val customerFormat = jsonFormat6(Customer)
}object SqlRoute {import JsConverters._implicit val jsonStreamingSupport = EntityStreamingSupport.json().withParallelMarshalling(parallelism = 8, unordered = false)class SqlRoute(val pathName: String, val jwt: String)(repo: JDBCRepo)(implicit  sys: ActorSystem, mat: ActorMaterializer) extends Directives with JsonConverter {val route = pathPrefix(pathName) {path(Segment / Remaining) { case (db, tbl) =>(get & parameter('sqltext)) { sql => {val rsc = new RSConverterval rows = repo.query[Map[String,Any]](db, sql, rsc.resultSet2Map)complete(rows.map(m => toJson(m)))}} ~ (post & parameter('sqltext)) { sql =>entity(as[String]){ json =>repo.batchInsert(db,tbl,sql,json)complete(StatusCodes.OK)}} ~ put {entity(as[Seq[String]]) { sqls =>repo.update(db, sqls)complete(StatusCodes.OK)}}}}}
}

jdbc-engine的特点是可以用字符类型的sql语句来操作。所以我们可以通过传递字符串型的sql语句来实现服务调用,很通用。restapi-sql提供的是对服务器端sqlserver的普通操作,包括读get,写入post,更改put。这些sqlserver操作部分是在JDBCRepo里的:

package com.datatech.rest.sql
import com.datatech.sdp.jdbc.engine.JDBCEngine._
import com.datatech.sdp.jdbc.engine.{JDBCQueryContext, JDBCUpdateContext}
import scalikejdbc._
import akka.stream.ActorMaterializer
import com.datatech.sdp.result.DBOResult.DBOResult
import akka.stream.scaladsl._
import scala.concurrent._
import SqlModels._object Repo {class JDBCRepo(implicit ec: ExecutionContextExecutor, mat: ActorMaterializer) {def query[R](db: String, sqlText: String, toRow: WrappedResultSet => R): Source[R,Any] = {//construct the contextval ctx = JDBCQueryContext(dbName = Symbol(db),statement = sqlText)jdbcAkkaStream(ctx,toRow)}def query(db: String, tbl: String, sqlText: String) = {//construct the contextval ctx = JDBCQueryContext(dbName = Symbol(db),statement = sqlText)jdbcQueryResult[Vector,RS](ctx,getConverter(tbl)).toFuture[Vector[RS]]}def update(db: String, sqlTexts: Seq[String]): DBOResult[Seq[Long]] = {val ctx = JDBCUpdateContext(dbName = Symbol(db),statements = sqlTexts)jdbcTxUpdates(ctx)}def bulkInsert[P](db: String, sqlText: String, prepParams: P => Seq[Any], params: Source[P,_]) = {val insertAction = JDBCActionStream(dbName = Symbol(db),parallelism = 4,processInOrder = false,statement = sqlText,prepareParams = prepParams)params.via(insertAction.performOnRow).to(Sink.ignore).run()}def batchInsert(db: String, tbl: String, sqlText: String, jsonParams: String):DBOResult[Seq[Long]] = {val ctx = JDBCUpdateContext(dbName = Symbol(db),statements = Seq(sqlText),batch = true,parameters = getSeqParams(jsonParams,sqlText))jdbcBatchUpdate[Seq](ctx)}}import monix.execution.Scheduler.Implicits.globalimplicit class DBResultToFuture(dbr: DBOResult[_]){def toFuture[R] = {dbr.value.value.runToFuture.map {eor =>eor match {case Right(or) => or match {case Some(r) => r.asInstanceOf[R]case None => throw new RuntimeException("Operation produced None result!")}case Left(err) => throw new RuntimeException(err)}}}}
}

读query部分即 def query[R](db: String, sqlText: String, toRow: WrappedResultSet => R): Source[R,Any] = {...} 这个函数返回Source[R,Any],下面我们好好谈谈这个R:R是读的结果,通常是某个类或model,比如读取Person记录返回一组Person类的实例。这里有一种强类型的感觉。一开始我也是随大流坚持建model后用toJson[E],fromJson[E]这样做线上数据转换。现在的问题是restapi-sql是一项公共服务,使用者知道sqlserver上有些什么表,然后希望通过sql语句来从这些表里读取数据。这些sql语句可能超出表的界限如sql join, union等,如果我们坚持每个返回结果都必须有个对应的model,那么显然就会牺牲这个服务的通用性。实际上,http线上数据交换本身就不可能是强类型的,因为经过了json转换。对于json转换来说,只要求字段名称、字段类型对称就行了。至于从什么类型转换成了另一个什么类型都没问题。所以,字段名+字段值的表现形式不就是Map[K,V]吗,我们就用Map[K,V]作为万能model就行了,没人知道。也就是说我们可以把jdbc的ResultSet转成Map[K,V]然后再转成json,接收方可以获取与model同样的字段名和字段值。好,就把ResultSet转成Map[String,Any]:

package com.datatech.rest.sql
import scalikejdbc._
import java.sql.ResultSetMetaData
class RSConverter {import RSConverterUtil._var rsMeta: ResultSetMetaData = _var columnCount: Int = 0var rsFields: List[(String,String)] = List[(String,String)]()def getFieldsInfo:List[(String,String)] =( 1 until columnCount).foldLeft(List[(String,String)]()) {case (cons,i) =>(rsMeta.getColumnName(i) -> rsMeta.getColumnTypeName(i)) :: cons}def resultSet2Map(rs: WrappedResultSet): Map[String,Any] = {if(columnCount == 0) {rsMeta =  rs.underlying.getMetaDatacolumnCount = rsMeta.getColumnCountrsFields = getFieldsInfo}rsFields.foldLeft(Map[String,Any]()) {case (m,(n,t)) =>m + (n -> rsFieldValue(n,t,rs))}}
}
object RSConverterUtil {import scala.collection.immutable.TreeMapdef map2Params(stm: String, m: Map[String,Any]): Seq[Any] = {val sortedParams = m.foldLeft(TreeMap[Int,Any]()) {case (t,(k,v)) => t + (stm.indexOfSlice(k) -> v)}sortedParams.map(_._2).toSeq}def rsFieldValue(fldname: String, fldType: String, rs: WrappedResultSet): Any = fldType match {case "LONGVARCHAR" => rs.string(fldname)case "VARCHAR" => rs.string(fldname)case "CHAR" => rs.string(fldname)case "BIT" => rs.boolean(fldname)case "TIME" => rs.time(fldname)case "TIMESTAMP" => rs.timestamp(fldname)case "ARRAY" => rs.array(fldname)case "NUMERIC" => rs.bigDecimal(fldname)case "BLOB" => rs.blob(fldname)case "TINYINT" => rs.byte(fldname)case "VARBINARY" => rs.bytes(fldname)case "BINARY" => rs.bytes(fldname)case "CLOB" => rs.clob(fldname)case "DATE" => rs.date(fldname)case "DOUBLE" => rs.double(fldname)case "REAL" => rs.float(fldname)case "FLOAT" => rs.float(fldname)case "INTEGER" => rs.int(fldname)case "SMALLINT" => rs.int(fldname)case "Option[Int]" => rs.intOpt(fldname)case "BIGINT" => rs.long(fldname)}
}

下面是个调用query服务的例子:

    val getAllRequest = HttpRequest(HttpMethods.GET,uri = "http://192.168.11.189:50081/api/sql/termtxns/brand?sqltext=SELECT%20*%20FROM%20BRAND",).addHeader(authentication)(for {response <- Http().singleRequest(getAllRequest)message <- Unmarshal(response.entity).to[String]} yield message).andThen {case Success(msg) => println(s"Received message: $msg")case Failure(err) => println(s"Error: ${err.getMessage}")}

特点是我只需要提供sql语句,服务就会返回一个json数组,然后我怎么转换json就随我高兴了。

这篇关于restapi(8)- restapi-sql:用户自主的服务的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/887194

相关文章

一文详解MySQL如何设置自动备份任务

《一文详解MySQL如何设置自动备份任务》设置自动备份任务可以确保你的数据库定期备份,防止数据丢失,下面我们就来详细介绍一下如何使用Bash脚本和Cron任务在Linux系统上设置MySQL数据库的自... 目录1. 编写备份脚本1.1 创建并编辑备份脚本1.2 给予脚本执行权限2. 设置 Cron 任务2

SQL Server修改数据库名及物理数据文件名操作步骤

《SQLServer修改数据库名及物理数据文件名操作步骤》在SQLServer中重命名数据库是一个常见的操作,但需要确保用户具有足够的权限来执行此操作,:本文主要介绍SQLServer修改数据... 目录一、背景介绍二、操作步骤2.1 设置为单用户模式(断开连接)2.2 修改数据库名称2.3 查找逻辑文件名

SQL Server数据库死锁处理超详细攻略

《SQLServer数据库死锁处理超详细攻略》SQLServer作为主流数据库管理系统,在高并发场景下可能面临死锁问题,影响系统性能和稳定性,这篇文章主要给大家介绍了关于SQLServer数据库死... 目录一、引言二、查询 Sqlserver 中造成死锁的 SPID三、用内置函数查询执行信息1. sp_w

canal实现mysql数据同步的详细过程

《canal实现mysql数据同步的详细过程》:本文主要介绍canal实现mysql数据同步的详细过程,本文通过实例图文相结合给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的... 目录1、canal下载2、mysql同步用户创建和授权3、canal admin安装和启动4、canal

SQL中JOIN操作的条件使用总结与实践

《SQL中JOIN操作的条件使用总结与实践》在SQL查询中,JOIN操作是多表关联的核心工具,本文将从原理,场景和最佳实践三个方面总结JOIN条件的使用规则,希望可以帮助开发者精准控制查询逻辑... 目录一、ON与WHERE的本质区别二、场景化条件使用规则三、最佳实践建议1.优先使用ON条件2.WHERE用

MySQL存储过程之循环遍历查询的结果集详解

《MySQL存储过程之循环遍历查询的结果集详解》:本文主要介绍MySQL存储过程之循环遍历查询的结果集,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录前言1. 表结构2. 存储过程3. 关于存储过程的SQL补充总结前言近来碰到这样一个问题:在生产上导入的数据发现

SpringBoot服务获取Pod当前IP的两种方案

《SpringBoot服务获取Pod当前IP的两种方案》在Kubernetes集群中,SpringBoot服务获取Pod当前IP的方案主要有两种,通过环境变量注入或通过Java代码动态获取网络接口IP... 目录方案一:通过 Kubernetes Downward API 注入环境变量原理步骤方案二:通过

MySQL 衍生表(Derived Tables)的使用

《MySQL衍生表(DerivedTables)的使用》本文主要介绍了MySQL衍生表(DerivedTables)的使用,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学... 目录一、衍生表简介1.1 衍生表基本用法1.2 自定义列名1.3 衍生表的局限在SQL的查询语句select

MySQL 横向衍生表(Lateral Derived Tables)的实现

《MySQL横向衍生表(LateralDerivedTables)的实现》横向衍生表适用于在需要通过子查询获取中间结果集的场景,相对于普通衍生表,横向衍生表可以引用在其之前出现过的表名,本文就来... 目录一、横向衍生表用法示例1.1 用法示例1.2 使用建议前面我们介绍过mysql中的衍生表(From子句

六个案例搞懂mysql间隙锁

《六个案例搞懂mysql间隙锁》MySQL中的间隙是指索引中两个索引键之间的空间,间隙锁用于防止范围查询期间的幻读,本文主要介绍了六个案例搞懂mysql间隙锁,具有一定的参考价值,感兴趣的可以了解一下... 目录概念解释间隙锁详解间隙锁触发条件间隙锁加锁规则案例演示案例一:唯一索引等值锁定存在的数据案例二: