博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
SparkStreaming 如何保证消费Kafka的数据不丢失不重复
阅读量:4206 次
发布时间:2019-05-26

本文共 11961 字,大约阅读时间需要 39 分钟。

目录

Receiver接收数据和采用Direct 方式。

(1)一个Receiver效率低,需要开启多个线程,手动合并数据再进行处理,并且Receiver方式为确保零数据丢失,需要开启WAL(预写日志)保证数据安全,这将同步保存所有收到的Kafka数据到HDFS,以便在发生故障时可以恢复所有数据。尽管WAL可以保证数据零丢失,但是不能保证exactly-once。Receivers接收完数据并保存到HDFS,但是在更新offset前,receivers失败了,Kafka以为数据没有接收成功,因为offset没有更新到ZooKeeper。随后receiver恢复了,从WAL可以读取的数据重新消费一次,因为使用Kafka高阶API,从ZooKeeper中保存的offsets开始消费

(2)Direct方式依靠checkpoint机制来保证。每次Spark Streaming消费了Kafka的数据后,将消费的Kafka offsets更新到checkpoint,消除了与ZooKeeper不一致的情况且可以从每个分区直接读取数据大大提高了并行能力。当你的程序挂掉或者升级的时候,就可以接着上次的读取,实现数据的零丢失。但是checkpoint太笨拙,因此可以自主管理offset,选取HBaseZooKeeperMySQLRedisKafka 等,保存对应topic下每个分区的offset,但是要注意当topic的新增分区的可能

  这里采用的方案:Spark Streamming使用Direct方式读取Kafka,利用数据库的事务, 将结果及offset一起写入到MySQL数据库中,保证是一个事务操作从而幂等性。

实现思路:

1)利用consumer api的seek方法可以指定offset进行消费,在启动消费者时查询数据库中记录的offset信息,如果是第一次启动,那么数据库中将没有offset信息,需要进行消费的元数据插入,然后从offset=0开始消费
2)关系型数据库具备事务的特性,当数据入库时,同时也将offset信息更新,借用关系型数据库事务的特性保证数据入库和修改offset记录这两个操作是在同一个事务中进行
3)使用ConsumerRebalanceListener来完成在分配分区时和Relalance时作出相应的处理逻辑

记录kafka信息表设计

create table kafka_info(	topic_group_partition varchar(32) primary key, //主题+组名+分区号 这里冗余设计方便通过这个主键进行更新提升效率 	topic_group varchar(30), //主题和组名	partition_num tinyint,//分区号	offsets bigint default 0 //offset信息);

代码实现

import com.alibaba.fastjson.JSON;import org.apache.kafka.clients.consumer.*;import org.apache.kafka.common.PartitionInfo;import org.apache.kafka.common.TopicPartition;import java.sql.Connection;import java.sql.PreparedStatement;import java.sql.ResultSet;import java.sql.SQLException;import java.time.Duration;import java.util.*;public class AccurateConsumer {
private static final Properties props = new Properties(); private static final String GROUP_ID = "Test"; static {
props.put("bootstrap.servers", "192.168.142.139:9092"); props.put("group.id", GROUP_ID); props.put("enable.auto.commit", false);//注意这里设置为手动提交方式 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); } final KafkaConsumer
consumer; //用于记录每次消费时每个partition的最新offset private Map
partitionOffsetMap; //用于缓存接受消息,然后进行批量入库 private List
list; private volatile boolean isRunning = true; private final String topicName; private final String topicNameAndGroupId; public AccurateConsumer(String topicName) {
this.topicName = topicName; topicNameAndGroupId = topicName + "_" + GROUP_ID; consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList(topicName), new HandleRebalance()); list = new ArrayList<>(100); partitionOffsetMap = new HashMap<>(); } //这里使用异步提交和同步提交的组合方式 public void receiveMsg() {
try {
while (isRunning) {
ConsumerRecords
consumerRecords = consumer.poll(Duration.ofSeconds(1)); if (!consumerRecords.isEmpty()) {
for (TopicPartition topicPartition : consumerRecords.partitions()) {
List
> records = consumerRecords.records(topicPartition); for (ConsumerRecord
record : records) { //使用fastjson将记录中的值转换为Message对象,并添加到list中 list.addAll(JSON.parseArray(record.value(), Message.class)); } //将partition对应的offset信息添加到map中,入库时将offset-partition信息一起进行入库 partitionOffsetMap.put(topicPartition, records.get(records.size() - 1) .offset() + 1);//记住这里一定要加1,因为下次消费的位置就是从+1的位置开始 } } //如果list中存在有数据,则进行入库操作 if (list.size() > 0) { boolean isSuccess = insertIntoDB(list, partitionOffsetMap); if (isSuccess) { //将缓存数据清空,并将offset信息清空 list.clear(); partitionOffsetMap.clear(); } } } } catch (Exception e) { //处理异常 } finally { //offset信息由我们自己保存,提交offset其实没有什么必要 //consumer.commitSync(); close(); } } private boolean insertIntoDB(List
list, Map
partitionOffsetMap) { Connection connection = getConnection();//获取数据库连接 自行实现 boolean flag = false; try { //设置手动提交,让插入数据和更新offset信息在一个事务中完成 connection.setAutoCommit(false); insertMessage(list);//将数据进行入库 自行实现 updateOffset(partitionOffsetMap);//更新offset信息 自行实现 connection.commit(); flag = true; } catch (SQLException e) { try { //出现异常则回滚事务 connection.rollback(); } catch (SQLException e1) { //处理异常 } } return flag; } //获取数据库连接 自行实现 private Connection getConnection() { return null; } public void close() { isRunning = false; if (consumer != null) { consumer.close(); } } private class HandleRebalance implements ConsumerRebalanceListener { @Override public void onPartitionsRevoked(Collection
partitions) { //发生Rebalance时,只需要将list中数据和记录offset信息清空即可 //这里为什么要清除数据,应为在Rebalance的时候有可能还有一批缓存数据在内存中没有进行入库, //并且offset信息也没有更新,如果不清除,那么下一次还会重新poll一次这些数据,将会导致数据重复 list.clear(); partitionOffsetMap.clear(); } @Override public void onPartitionsAssigned(Collection
partitions) { //获取对应Topic的分区数 List
partitionInfos = consumer.partitionsFor(topicName); Map
partitionOffsetMapFromDB = getPartitionOffsetMapFromDB (partitionInfos.size()); //在分配分区时指定消费位置 for (TopicPartition partition : partitions) { //如果在数据库中有对应partition的信息则使用,否则将默认从offset=0开始消费 if (partitionOffsetMapFromDB.get(partition) != null) { consumer.seek(partition, partitionOffsetMapFromDB.get(partition)); } else { consumer.seek(partition, 0L); } } } } /** * 从数据库中查询分区和offset信息 * @param size 分区数量 * @return 分区号和offset信息 */ private Map
getPartitionOffsetMapFromDB(int size) { Map
partitionOffsetMapFromDB = new HashMap<>(); //从数据库中查询出对应信息 Connection connection = getConnection();//获取数据库连接 自行实现 PreparedStatement preparedStatement = null; ResultSet resultSet = null; String querySql = "SELECT partition_num,offsets from kafka_info WHERE topic_group = ?"; try { preparedStatement = connection.prepareStatement(querySql); preparedStatement.setString(1, topicNameAndGroupId); resultSet = preparedStatement.executeQuery(); while (resultSet.next()) { partitionOffsetMapFromDB.put(new TopicPartition(topicName, resultSet.getInt(1)), resultSet.getLong(2)); } //判断数据库是否存在所有的分区的信息,如果没有,则需要进行初始化 if (partitionOffsetMapFromDB.size() < size) { connection.setAutoCommit(false); StringBuilder sqlBuilder = new StringBuilder(); //partition分区号是从0开始,如果有10个分区,那么分区号就是0-9 /*这里拼接插入数据 格式 INSERT INTO kafka_info(topic_group_partition,topic_group,partition_num) VALUES (topicNameAndGroupId_0,topicNameAndGroupId,0),(topicNameAndGroupId_1, topicNameAndGroupId,1)....*/ for (int i = 0; i < size; i++) { sqlBuilder.append("(").append (topicNameAndGroupId).append("_").append(i).append(",").append (topicNameAndGroupId).append(",").append(i).append("),"); } //将最后一个逗号去掉加上分号结束 sqlBuilder.deleteCharAt(sqlBuilder.length() - 1).append(";"); preparedStatement = connection.prepareStatement("INSERT INTO kafa_info" + "(topic_group_partition,topic_group,partition_num) VALUES " + sqlBuilder.toString()); preparedStatement.execute(); connection.commit(); } } catch (SQLException e) { //处理异常 回滚事务 这里应该结束程序 排查错误 try { connection.rollback(); } catch (SQLException e1) { //打印日志 排查错误信息 } } finally { try { if (resultSet != null) { resultSet.close(); } if (preparedStatement != null) { preparedStatement.close(); } if (connection != null) { connection.close(); } } catch (SQLException e) { //处理异常 打印日志即可 关闭资源失败 } } return partitionOffsetMapFromDB; }}

数据对象

public class Message {
private String id; private String name; private Syring desc; private Date time; //get set toString 方法省略}

————————————————————————————

数据库表

create table kafka_offset(group_id varchar(30),topic varchar(30),partition_id int(5),fromOffset bigint(18),untilOffset bigint(18),primary key(topic,group_id,partition_id));

2.scalajdbc连接配置(在resources资源目录下:application.conf)

db.default.driver="com.mysql.jdbc.Driver"db.default.url="jdbc:mysql://127.0.0.1:3306/rz?characterEncoding=utf8&useSSL=false&serverTimezone=UTC&rewriteBatchedStatements=true"db.default.user="root"db.default.password="123456"

3.pom.xml

2.11.12
2.4.2
6.6.2
org.scala-lang
scala-library
${
scala.version}
org.apache.zookeeper
zookeeper
3.4.6
org.apache.spark
spark-streaming_2.11
${
spark.version}
org.apache.spark
spark-streaming-kafka-0-10_2.11
${
spark.version}
mysql
mysql-connector-java
5.1.24
org.scalikejdbc
scalikejdbc_2.11
3.3.2
org.scalikejdbc
scalikejdbc-config_2.11
3.3.2

Streaming demo

import org.apache.kafka.common.TopicPartitionimport org.apache.kafka.common.serialization.StringDeserializerimport org.apache.spark.streaming.kafka010._import scalikejdbc._import org.apache.spark.streaming.{
Seconds, StreamingContext}import org.apache.spark.{
SparkConf, TaskContext}/** * 自定义offset存储 */object DefinedOffSetApp {
private val user = "root" private val password = "123456" private val url = "jdbc:mysql://127.0.0.1:3306/rz?characterEncoding=utf8&useSSL=false&serverTimezone=UTC&rewriteBatchedStatements=true" def main(args: Array[String]): Unit = {
//初始化spark streaming应用程序 val conf = new SparkConf().setAppName("DefinedOffSetApp").setMaster("local[2]") .set("spark.streaming.stopGracefullyOnShutdown","true") val ssc = new StreamingContext(conf,Seconds(5)) //kafka consumer的参数配置 val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "localhost:9092", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "random", "auto.offset.reset" -> "latest", //必须改为false "enable.auto.commit" -> (false: java.lang.Boolean) ) //初始化scalikejdbc scalikejdbc.config.DBs.setupAll() //MySQL中获取最新的指定groupid和topic的offset信息,若不存则返回空的map val fromOffsets = DB.readOnly( implicit session => {
//这里应为表里只有这个topic和groupid信息,所以没做where 过滤。这点需要注意 SQL("select * from kafka_offset").map(rs => {
new TopicPartition(rs.string("topic"),rs.int("partition_id")) -> rs.long("untilOffset") }).list().apply() }).toMap val topics = Array("streaming-topic") //获取DirectStream从指定的partition的offset开始消费,若fromOffsets为空,这从头开始消费 val stream = if(fromOffsets.isEmpty) KafkaUtils.createDirectStream[String, String]( ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String,String](topics,kafkaParams) ) else KafkaUtils.createDirectStream[String, String]( ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets) ) //获取当前批次的offset信息 var offsetRanges:Array[OffsetRange] = Array.empty stream.transform(rdd => {
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd.foreachPartition{
iter => val o: OffsetRange = offsetRanges(TaskContext.get.partitionId) println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}") } rdd }).map(x => (x.key(),x.value()) ).foreachRDD(rdd => {
rdd(println) //遍历不同分区的offset信息,并更新在MySQL中 offsetRanges.foreach(x => {
DB.autoCommit( implicit session => {
SQL("replace into kafka_offset(topic,group_id,partition_id,fromOffset,untilOffset) values (?,?,?,?,?)") .bind(x.topic,"random",x.partition,x.fromOffset,x.untilOffset) .update().apply() }) }) } ) ssc.start() ssc.awaitTermination() }}

转载地址:http://iyhli.baihongyu.com/

你可能感兴趣的文章
如何给分类增加一个属性(后台)
查看>>
linux设置环境变量 临时设置 和 永久设置
查看>>
mysql数据库主从同步的问题解决方法
查看>>
LoadRunner如何在脚本运行时修改log设置选项?
查看>>
QC数据库表结构
查看>>
自动化测试工具的3个关键部分
查看>>
测试工具厂商的编程语言什么时候“退休”?
查看>>
资源监控工具 - Hyperic HQ
查看>>
LoadRunner中Concurrent与Simultaneous的区别
查看>>
SiteScope - Agentless监控
查看>>
QTP测试.NET控件CheckedListBox
查看>>
使用QTP的.NET插件扩展技术测试ComponentOne的ToolBar控件
查看>>
用上帝之眼进行自动化测试
查看>>
为LoadRunner写一个lr_save_float函数
查看>>
PrefTest工作室全新力作-《性能测试与调优实战》课程视频即将上线
查看>>
质量度量分析与测试技术 培训大纲
查看>>
欢迎加入【亿能测试快讯】邮件列表!
查看>>
为什么我们的自动化测试“要”这么难
查看>>
LoadRunner性能脚本开发实战训练
查看>>
测试之途,前途?钱途?图何?
查看>>