在线赌币机 始末扩展 Spark SQL ,打造本身的大数据分析引擎

Spark SQL 的 Catalyst ,这片面真的很有有趣,值得往仔细钻研一番,今天先来说说Spark的一些扩展机制吧,上一次写Spark,对其SQL的解析进走了必定的魔改,今天吾们按套路来,行使砖厂为吾们挑供的机制,来扩展Spark...

今天(7月31日)下午,国务院联防联控机制召开新闻发布会,介绍疫情防控和疫苗接种有关情况。

禄口国际机场抗疫防线何以失守?

在经历了7月1日到7月15日历史数据统计的“史上最强劲两周”后,美股即将迎来统计数据上的最糟糕月份——八月。

回顾上半年,定调从4月的乐观转为稳定。首先应看到,本次政治局会议对上半年经济情况的回顾定调是稳定,而4月回顾开年经济工作定调为“开局良好”,时隔一个季度,政治局会议的关注点正在向一些结构性偏弱的潜在风险侧重。但二季度以来,包括大宗商品价格飙升抑制消费和投资需求、外部环境趋于复杂外需或将降温、产业链安全性受到更严峻考验等结构性风险点逐步涌现,令本次会议整体定调转为谨慎。这一方向性的转变,意味着下半年各项经济政策将从更深层次的稳增长、稳结构的角度出发展开,而已经与疫情和疫后恢复这样的短期逻辑、以及市场的短期预期扰动相关度非常之低了。

一只偏股基金单日可以上涨10%以上,在正常情况下,这件事情即便不是绝无仅有,也是寥若晨星。

最先吾们先来晓畅一下 Spark SQL 的团体实走流程,输入的查询先被解析成未有关元数据的逻辑计划,然后按照元数据休争析规则,生成逻辑计划,再经过优化规则,形成优化过的逻辑计划(RBO),将逻辑计划转换成物理计划在经过代价模型(CBO),输出真实的物理实走计划。

吾们今天举三个扩展的例子,来进走表明。

扩展解析器

这个例子,吾们扩展解析引擎,吾们对输入的SQL,不准泛查询即不许行使select *来做查询,以下是解析的代。

package wang.datahub.parser  import org.apache.spark.sql.catalyst.analysis.UnresolvedStar import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.types.{DataType, StructType}  class MyParser(parser: ParserInterface) extends ParserInterface {  /**   * Parse a string to a [[LogicalPlan]].   */  override def parsePlan(sqlText: String): LogicalPlan = {    val logicalPlan = parser.parsePlan(sqlText)    logicalPlan transform {      case project &@nbsp;Project(projectList, _) =>        projectList.foreach {          name =>            if (name.isInstanceOf[UnresolvedStar]) {              throw new RuntimeException("You must specify your project column set," +                " * is not allowed.")           }       }        project   }    logicalPlan }   /**   * Parse a string to an [[Expression]].   */  override def parseExpression(sqlText: String): Expression = parser.parseExpression(sqlText)   /**   * Parse a string to a [[TableIdentifier]].   */  override def parseTableIdentifier(sqlText: String): TableIdentifier =    parser.parseTableIdentifier(sqlText)   /**   * Parse a string to a [[FunctionIdentifier]].   */  override def parseFunctionIdentifier(sqlText: String): FunctionIdentifier =    parser.parseFunctionIdentifier(sqlText)   /**   * Parse a string to a [[StructType]]. The passed SQL string should be a comma separated   * list of field definitions which will preserve the correct Hive metadata.   */  override def parseTableSchema(sqlText: String): StructType =    parser.parseTableSchema(sqlText)   /**   * Parse a string to a [[DataType]].   */  override def parseDataType(sqlText: String): DataType = parser.parseDataType(sqlText) } 

接下来,吾们测试一下

package wang.datahub.parser  import org.apache.spark.sql.{SparkSession, SparkSessionExtensions} import org.apache.spark.sql.catalyst.parser.ParserInterface  object MyParserApp {  def main(args: Array[String]): Unit = {    System.setProperty("hadoop.home.dir","E:\\devlop\\envs\\hadoop-common-2.2.0-bin-master");    type ParserBuilder = (SparkSession, ParserInterface) => ParserInterface    type ExtensionsBuilder = SparkSessionExtensions => Unit    val parserBuilder: ParserBuilder = (_, parser) => new MyParser(parser)    val extBuilder: ExtensionsBuilder = { e => e.injectParser(parserBuilder)}    val spark =  SparkSession     .builder()     .appName("Spark SQL basic example")     .config("spark.master", "local[*]")     .withExtensions(extBuilder)     .getOrCreate()     spark.sparkContext.setLogLevel("ERROR")     import spark.implicits._     val df = Seq(     ( "First Value",1, java.sql.Date.valueOf("2010-01-01")),     ( "First Value",4, java.sql.Date.valueOf("2010-01-01")),     ("Second Value",2,  java.sql.Date.valueOf("2010-02-01")),     ("Second Value",9,  java.sql.Date.valueOf("2010-02-01"))   ).toDF("name", "score", "date_column")    df.createTempView("p")     //   val df = spark.read.json("examples/src/main/resources/people.json")    //   df.toDF().write.saveAsTable("person")    //,javg(score)     // custom parser    //   spark.sql("select * from p ").show     spark.sql("select * from p").show() } } 

下面是实走终局,相符吾们的预期。

扩展优化器

接下来,吾们来扩展优化器,砖厂挑供了许多默认的RBO,这边能够方便的构建吾们本身的优化规则,本例中吾们构建一套比较清新的规则,而且是十足不等价的,这边只是为了表明。

针对字段+0的操作,规则如下:

鸿蒙官方战略配相符共建——HarmonyOS技术社区 倘若0出现在+左边,则直接将字段变成右外达式,即 0+nr 等效为 nr 倘若0出现在+右边,则将0变成3,即 nr+0 变成 nr+3 倘若没展现0,则外达式不变

下面是代码:

package wang.datahub.optimizer  import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.expressions.{Add, Expression, Literal} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.Rule  object MyOptimizer extends Rule[LogicalPlan] {   def apply(logicalPlan: LogicalPlan): LogicalPlan = {    logicalPlan.transformAllExpressions {      case Add(left, right) => {        println("this this my add optimizer")        if (isStaticAdd(left)) {          right       } else if (isStaticAdd(right)) {          Add(left, Literal(3L))       } else {          Add(left, right)       }     }   } }   private def isStaticAdd(expression: Expression): Boolean = {    expression.isInstanceOf[Literal] && expression.asInstanceOf[Literal].toString == "0" }   def main(args: Array[String]): Unit = {    System.setProperty("hadoop.home.dir","E:\\devlop\\envs\\hadoop-common-2.2.0-bin-master");    val testSparkSession: SparkSession = SparkSession.builder().appName("Extra optimization rules")     .master("local[*]")     .withExtensions(extensions => {        extensions.injectOptimizerRule(session => MyOptimizer)     })     .getOrCreate()     testSparkSession.sparkContext.setLogLevel("ERROR")     import testSparkSession.implicits._    testSparkSession.experimental.extraOptimizations = Seq()    Seq(-1, -2, -3).toDF("nr").write.mode("overwrite").json("./test_nrs") //   val optimizedResult = testSparkSession.read.json("./test_nrs").selectExpr("nr + 0")    testSparkSession.read.json("./test_nrs").createTempView("p")     var sql = "select nr+0 from p";    var t = testSparkSession.sql(sql)    println(t.queryExecution.optimizedPlan)    println(sql)    t.show()     sql = "select 0+nr from p";    var  u = testSparkSession.sql(sql)    println(u.queryExecution.optimizedPlan)    println(sql)    u.show()     sql = "select nr+8 from p";    var  v = testSparkSession.sql(sql)    println(v.queryExecution.optimizedPlan)    println(sql)    v.show() //   println(optimizedResult.queryExecution.optimizedPlan.toString() ) //   optimizedResult.collect().map(row => row.getAs[Long]("(nr + 0)"))    Thread.sleep(1000000) }  } 

实走如下

this this my add optimizer this this my add optimizer this this my add optimizer Project [(nr#12L + 3) AS (nr + CAST(0 AS BIGINT))#14L] +- Relation[nr#12L] json  select nr+0 from p this this my add optimizer this this my add optimizer this this my add optimizer +------------------------+ |(nr + CAST(0 AS BIGINT))| +------------------------+ |                       2| |                       1| |                       0| +------------------------+  this this my add optimizer Project [nr#12L AS (CAST(0 AS BIGINT) + nr)#21L] +- Relation[nr#12L] json  select 0+nr from p this this my add optimizer +------------------------+ |(CAST(0 AS BIGINT) + nr)| +------------------------+ |                     -1| |                     -2| |                     -3| +------------------------+  this this my add optimizer this this my add optimizer this this my add optimizer Project [(nr#12L + 8) AS (nr + CAST(8 AS BIGINT))#28L] +- Relation[nr#12L] json  select nr+8 from p this this my add optimizer this this my add optimizer this this my add optimizer +------------------------+ |(nr + CAST(8 AS BIGINT))| +------------------------+ |                       7| |                       6| |                       5| +------------------------+ 
扩展策略

SparkStrategies包含了一系列特定的Strategies,这些Strategies是继承自QueryPlanner中定义的Strategy,它定义批准一个Logical Plan,生成一系列的Physical Plan

始末Strategies把逻辑计划转换成能够详细实走的物理计划,代码如下

package wang.datahub.strategy  import org.apache.spark.sql.{SparkSession, Strategy} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.SparkPlan  object MyStrategy extends Strategy {  def apply(plan: LogicalPlan): Seq[SparkPlan] = {    println("Hello world!")    Nil }   def main(args: Array[String]): Unit = {    System.setProperty("hadoop.home.dir","E:\\devlop\\envs\\hadoop-common-2.2.0-bin-master");    val spark = SparkSession.builder().master("local").getOrCreate()     spark.experimental.extraStrategies = Seq(MyStrategy)    val q = spark.catalog.listTables.filter(t => t.name == "six")    q.explain(true)    spark.stop() } } 

实走成果

益了,扩展片面就先介绍到这,接下来吾计划能够会浅易说说RBO和CBO,结相符之前做过的一个幼功能,一条SQL的查询时间预估。

本文转载自微信公多号「麒思妙想」,能够始末以下二维码关注。转载本文请有关麒思妙想公多号。

【编辑保举】在线赌币机

鸿蒙官方战略配相符共建——HarmonyOS技术社区 关于MySQL库外名大幼写题目 MySQL next-key lock 添锁周围是什么? 数据库篇:MySQL内置函数 MySql+SSCMS编制 MySQL 添锁周围三——清淡索引和清淡字段