Spark Basic

2018-03-31 编程 #code #spark

引言

大数据计算和普通的程序并无本质区别:数据输入=>计算=>输出和结果的持久化。这里的挑战在于计算的效率和容错性。由于数据输入巨大,计算的效率是基本的要求。为了在通用硬件上高效完成大量计算,唯一的途径就是将计算任务拆分分布式计算。这就引出了新的问题:分布式计算资源的管理(Mesos,YARN),分布式计算失败后的恢复(容错性)(Spark RDD),以及分布式的数据输入和保存(分布式文件 HDFS).hadoop 生态圈就是为了解决几个问题设计的 (YARN,MapR,HDFS).只不过在计算这一环节 Spark 做的更加高效取代了 MapR.所以先看下 hadoop 的核心两个组件。

HDFS

File ingestion into a multi-node cluster

# 上传一个文件 -f表示覆盖
hadoop fs -put -f jour.txt /user/dahu/jour/
# 下载
hadoop fs -get /user/dahu/jour/jour.txt
# ls
hadoop fs -ls /user/dahu/
# 删除 -r表示递归,删除目录
hadoop fs -rm /user/dahu/jour/jour.txt
hadoop fs -rm -r /user/dahu/jour

YARN

yarn_architecture

Spark 基本概念

cluster-overview

RDD

pass

SparkStreaming

pass

SparkStreaming+Kafka

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "localhost:9092,anotherhost:9092",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "use_a_separate_group_id_for_each_stream",
  "auto.offset.reset" -> "latest",
  "enable.auto.commit" -> (false: java.lang.Boolean)
)

val topics = Array("topicA", "topicB")
val stream = KafkaUtils.createDirectStream[String, String](
  streamingContext,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams)
)

stream.map(record => (record.key, record.value))

DStream 的 elements:record is ConsumerRecord<K,V>: A key/value pair to be received from Kafka. This consists of a topic name and a partition number, from which the record is being received and an offset that points to the record in a Kafka partition.包含 key(),offset(),partation() 方法等。

常见错误

数据库 (mysql redis) 连接的可序列化问题

dstream.foreachRDD { rdd =>
  val connection = createNewConnection()  // executed at the driver
  rdd.foreach { record =>
    connection.send(record) // executed at the worker
  }
}
// 上面的写法会导致 connection 不可序列化的错误:Task not serializable
// RDD 的函数 (map,foreach) 会被序列化发送到 worker 节点执行,但是 connection 是和 tcp 连接,和机器绑定的,无法序列化

dstream.foreachRDD { rdd =>
  rdd.foreach { record =>  // on worker node
    val connection = createNewConnection() // 给每个 record 处理时新建一个连接,会导致严重的数据库连接性能问题
    connection.send(record)
    connection.close()
  }
}

// 更好的方式是给每个 partation 新建一个连接
dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    val connection = createNewConnection() 
    partitionOfRecords.foreach(record => connection.send(record))
    connection.close()
  }
}

// 最好的方法是维护一个静态线程池:
[ConnectionPool](https://github.com/RedisLabs/spark-redis/blob/master/src/main/scala/com/redislabs/provider/redis/ConnectionPool.scala)
// then use in partition
dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    // ConnectionPool is a static, lazily initialized pool of connections
    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
  }
}
// Note that the connections in the pool should be lazily created on demand and timed out if not used for a while. 
// This achieves the most efficient sending of data to external systems.

// 示例
case class RedisCluster(clusterHosts: String, password: String) extends Serializable {

  def this(conf: SparkConf) {
    this(
      conf.get("spark.redis.host", Protocol.DEFAULT_HOST),
      conf.get("spark.redis.auth", null)
    )
  }

  /**
   *
   * @return use for JedisCluster or JedisPool
   */
  def toSet(): java.util.Set[HostAndPort] = {
    val nodes: mutable.Set[HostAndPort] = mutable.Set()
    for (host_port <- clusterHosts.split(",")) {
      val hp = host_port
      print(hp)
      nodes += HostAndPort.from(host_port)
    }
    nodes.asJava
  }

}

object RedisClusterUtils extends Serializable {

  @transient private lazy val pools: ConcurrentHashMap[RedisCluster, JedisCluster] =
    new ConcurrentHashMap[RedisCluster, JedisCluster]()

  /**
   * 获取一个 JedisCluster
   * @param rc
   * @return
   */
  def connect(rc: RedisCluster): JedisCluster = {

    pools.getOrElseUpdate(rc, {
      val poolConfig = new JedisPoolConfig();
      poolConfig.setMaxTotal(250)
      poolConfig.setMaxIdle(32)
      poolConfig.setTestOnBorrow(false)
      poolConfig.setTestOnReturn(false)
      poolConfig.setTestWhileIdle(false)
      poolConfig.setNumTestsPerEvictionRun(-1)

      val jedisCluster = new JedisCluster(rc.toSet(),
        3000,
        3000,
        5,
        rc.password,
        poolConfig)

      jedisCluster
    })
  }

  /**
   * 查询币种对应汇率
   * @param jedisCluster 目标 redis
   * @param ccyCd 币种代码
   * @return 折美元汇率
   */
  def getCcyRatio(jedisCluster: JedisCluster, ccyCd:String): Double ={
    val res = jedisCluster.get("CCY:"+ccyCd)
    res.split(":")(2).toDouble
  }
}

参考
Design Patterns for using foreachRDD
Redis on Spark:Task not serializable
How to create connection(s) to a Datasource in Spark Streaming for Lookups

最佳实践