本文共 11961 字,大约阅读时间需要 39 分钟。
Receiver接收数据和采用Direct 方式。
(1)一个Receiver效率低,需要开启多个线程,手动合并数据再进行处理,并且Receiver方式为确保零数据丢失,需要开启WAL(预写日志)保证数据安全,这将同步保存所有收到的Kafka数据到HDFS,以便在发生故障时可以恢复所有数据。尽管WAL可以保证数据零丢失,但是不能保证exactly-once。Receivers接收完数据并保存到HDFS,但是在更新offset前,receivers失败了,Kafka以为数据没有接收成功,因为offset没有更新到ZooKeeper。随后receiver恢复了,从WAL可以读取的数据重新消费一次,因为使用Kafka高阶API,从ZooKeeper中保存的offsets开始消费
(2)Direct方式依靠checkpoint机制来保证。每次Spark Streaming消费了Kafka的数据后,将消费的Kafka offsets更新到checkpoint,消除了与ZooKeeper不一致的情况且可以从每个分区直接读取数据大大提高了并行能力。当你的程序挂掉或者升级的时候,就可以接着上次的读取,实现数据的零丢失。但是checkpoint太笨拙,因此可以自主管理offset,选取HBase、ZooKeeper、MySQL、Redis或Kafka 等,保存对应topic下每个分区的offset,但是要注意当topic的新增分区的可能
这里采用的方案:Spark Streamming使用Direct方式读取Kafka,利用数据库的事务, 将结果及offset一起写入到MySQL数据库中,保证是一个事务操作从而幂等性。
实现思路:
1)利用consumer api的seek方法可以指定offset进行消费,在启动消费者时查询数据库中记录的offset信息,如果是第一次启动,那么数据库中将没有offset信息,需要进行消费的元数据插入,然后从offset=0开始消费 2)关系型数据库具备事务的特性,当数据入库时,同时也将offset信息更新,借用关系型数据库事务的特性保证数据入库和修改offset记录这两个操作是在同一个事务中进行 3)使用ConsumerRebalanceListener来完成在分配分区时和Relalance时作出相应的处理逻辑记录kafka信息表设计
create table kafka_info( topic_group_partition varchar(32) primary key, //主题+组名+分区号 这里冗余设计方便通过这个主键进行更新提升效率 topic_group varchar(30), //主题和组名 partition_num tinyint,//分区号 offsets bigint default 0 //offset信息);
代码实现
import com.alibaba.fastjson.JSON;import org.apache.kafka.clients.consumer.*;import org.apache.kafka.common.PartitionInfo;import org.apache.kafka.common.TopicPartition;import java.sql.Connection;import java.sql.PreparedStatement;import java.sql.ResultSet;import java.sql.SQLException;import java.time.Duration;import java.util.*;public class AccurateConsumer { private static final Properties props = new Properties(); private static final String GROUP_ID = "Test"; static { props.put("bootstrap.servers", "192.168.142.139:9092"); props.put("group.id", GROUP_ID); props.put("enable.auto.commit", false);//注意这里设置为手动提交方式 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); } final KafkaConsumerconsumer; //用于记录每次消费时每个partition的最新offset private Map partitionOffsetMap; //用于缓存接受消息,然后进行批量入库 private List list; private volatile boolean isRunning = true; private final String topicName; private final String topicNameAndGroupId; public AccurateConsumer(String topicName) { this.topicName = topicName; topicNameAndGroupId = topicName + "_" + GROUP_ID; consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList(topicName), new HandleRebalance()); list = new ArrayList<>(100); partitionOffsetMap = new HashMap<>(); } //这里使用异步提交和同步提交的组合方式 public void receiveMsg() { try { while (isRunning) { ConsumerRecords consumerRecords = consumer.poll(Duration.ofSeconds(1)); if (!consumerRecords.isEmpty()) { for (TopicPartition topicPartition : consumerRecords.partitions()) { List > records = consumerRecords.records(topicPartition); for (ConsumerRecord record : records) { //使用fastjson将记录中的值转换为Message对象,并添加到list中 list.addAll(JSON.parseArray(record.value(), Message.class)); } //将partition对应的offset信息添加到map中,入库时将offset-partition信息一起进行入库 partitionOffsetMap.put(topicPartition, records.get(records.size() - 1) .offset() + 1);//记住这里一定要加1,因为下次消费的位置就是从+1的位置开始 } } //如果list中存在有数据,则进行入库操作 if (list.size() > 0) { boolean isSuccess = insertIntoDB(list, partitionOffsetMap); if (isSuccess) { //将缓存数据清空,并将offset信息清空 list.clear(); partitionOffsetMap.clear(); } } } } catch (Exception e) { //处理异常 } finally { //offset信息由我们自己保存,提交offset其实没有什么必要 //consumer.commitSync(); close(); } } private boolean insertIntoDB(List list, Map partitionOffsetMap) { Connection connection = getConnection();//获取数据库连接 自行实现 boolean flag = false; try { //设置手动提交,让插入数据和更新offset信息在一个事务中完成 connection.setAutoCommit(false); insertMessage(list);//将数据进行入库 自行实现 updateOffset(partitionOffsetMap);//更新offset信息 自行实现 connection.commit(); flag = true; } catch (SQLException e) { try { //出现异常则回滚事务 connection.rollback(); } catch (SQLException e1) { //处理异常 } } return flag; } //获取数据库连接 自行实现 private Connection getConnection() { return null; } public void close() { isRunning = false; if (consumer != null) { consumer.close(); } } private class HandleRebalance implements ConsumerRebalanceListener { @Override public void onPartitionsRevoked(Collection partitions) { //发生Rebalance时,只需要将list中数据和记录offset信息清空即可 //这里为什么要清除数据,应为在Rebalance的时候有可能还有一批缓存数据在内存中没有进行入库, //并且offset信息也没有更新,如果不清除,那么下一次还会重新poll一次这些数据,将会导致数据重复 list.clear(); partitionOffsetMap.clear(); } @Override public void onPartitionsAssigned(Collection partitions) { //获取对应Topic的分区数 List partitionInfos = consumer.partitionsFor(topicName); Map partitionOffsetMapFromDB = getPartitionOffsetMapFromDB (partitionInfos.size()); //在分配分区时指定消费位置 for (TopicPartition partition : partitions) { //如果在数据库中有对应partition的信息则使用,否则将默认从offset=0开始消费 if (partitionOffsetMapFromDB.get(partition) != null) { consumer.seek(partition, partitionOffsetMapFromDB.get(partition)); } else { consumer.seek(partition, 0L); } } } } /** * 从数据库中查询分区和offset信息 * @param size 分区数量 * @return 分区号和offset信息 */ private Map getPartitionOffsetMapFromDB(int size) { Map partitionOffsetMapFromDB = new HashMap<>(); //从数据库中查询出对应信息 Connection connection = getConnection();//获取数据库连接 自行实现 PreparedStatement preparedStatement = null; ResultSet resultSet = null; String querySql = "SELECT partition_num,offsets from kafka_info WHERE topic_group = ?"; try { preparedStatement = connection.prepareStatement(querySql); preparedStatement.setString(1, topicNameAndGroupId); resultSet = preparedStatement.executeQuery(); while (resultSet.next()) { partitionOffsetMapFromDB.put(new TopicPartition(topicName, resultSet.getInt(1)), resultSet.getLong(2)); } //判断数据库是否存在所有的分区的信息,如果没有,则需要进行初始化 if (partitionOffsetMapFromDB.size() < size) { connection.setAutoCommit(false); StringBuilder sqlBuilder = new StringBuilder(); //partition分区号是从0开始,如果有10个分区,那么分区号就是0-9 /*这里拼接插入数据 格式 INSERT INTO kafka_info(topic_group_partition,topic_group,partition_num) VALUES (topicNameAndGroupId_0,topicNameAndGroupId,0),(topicNameAndGroupId_1, topicNameAndGroupId,1)....*/ for (int i = 0; i < size; i++) { sqlBuilder.append("(").append (topicNameAndGroupId).append("_").append(i).append(",").append (topicNameAndGroupId).append(",").append(i).append("),"); } //将最后一个逗号去掉加上分号结束 sqlBuilder.deleteCharAt(sqlBuilder.length() - 1).append(";"); preparedStatement = connection.prepareStatement("INSERT INTO kafa_info" + "(topic_group_partition,topic_group,partition_num) VALUES " + sqlBuilder.toString()); preparedStatement.execute(); connection.commit(); } } catch (SQLException e) { //处理异常 回滚事务 这里应该结束程序 排查错误 try { connection.rollback(); } catch (SQLException e1) { //打印日志 排查错误信息 } } finally { try { if (resultSet != null) { resultSet.close(); } if (preparedStatement != null) { preparedStatement.close(); } if (connection != null) { connection.close(); } } catch (SQLException e) { //处理异常 打印日志即可 关闭资源失败 } } return partitionOffsetMapFromDB; }}
数据对象
public class Message { private String id; private String name; private Syring desc; private Date time; //get set toString 方法省略}
————————————————————————————
数据库表create table kafka_offset(group_id varchar(30),topic varchar(30),partition_id int(5),fromOffset bigint(18),untilOffset bigint(18),primary key(topic,group_id,partition_id));
2.scalajdbc连接配置(在resources资源目录下:application.conf)
db.default.driver="com.mysql.jdbc.Driver"db.default.url="jdbc:mysql://127.0.0.1:3306/rz?characterEncoding=utf8&useSSL=false&serverTimezone=UTC&rewriteBatchedStatements=true"db.default.user="root"db.default.password="123456"
3.pom.xml
2.11.12 2.4.2 6.6.2 org.scala-lang scala-library ${ scala.version} org.apache.zookeeper zookeeper 3.4.6 org.apache.spark spark-streaming_2.11 ${ spark.version} org.apache.spark spark-streaming-kafka-0-10_2.11 ${ spark.version} mysql mysql-connector-java 5.1.24 org.scalikejdbc scalikejdbc_2.11 3.3.2 org.scalikejdbc scalikejdbc-config_2.11 3.3.2
Streaming demo
import org.apache.kafka.common.TopicPartitionimport org.apache.kafka.common.serialization.StringDeserializerimport org.apache.spark.streaming.kafka010._import scalikejdbc._import org.apache.spark.streaming.{ Seconds, StreamingContext}import org.apache.spark.{ SparkConf, TaskContext}/** * 自定义offset存储 */object DefinedOffSetApp { private val user = "root" private val password = "123456" private val url = "jdbc:mysql://127.0.0.1:3306/rz?characterEncoding=utf8&useSSL=false&serverTimezone=UTC&rewriteBatchedStatements=true" def main(args: Array[String]): Unit = { //初始化spark streaming应用程序 val conf = new SparkConf().setAppName("DefinedOffSetApp").setMaster("local[2]") .set("spark.streaming.stopGracefullyOnShutdown","true") val ssc = new StreamingContext(conf,Seconds(5)) //kafka consumer的参数配置 val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "localhost:9092", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "random", "auto.offset.reset" -> "latest", //必须改为false "enable.auto.commit" -> (false: java.lang.Boolean) ) //初始化scalikejdbc scalikejdbc.config.DBs.setupAll() //MySQL中获取最新的指定groupid和topic的offset信息,若不存则返回空的map val fromOffsets = DB.readOnly( implicit session => { //这里应为表里只有这个topic和groupid信息,所以没做where 过滤。这点需要注意 SQL("select * from kafka_offset").map(rs => { new TopicPartition(rs.string("topic"),rs.int("partition_id")) -> rs.long("untilOffset") }).list().apply() }).toMap val topics = Array("streaming-topic") //获取DirectStream从指定的partition的offset开始消费,若fromOffsets为空,这从头开始消费 val stream = if(fromOffsets.isEmpty) KafkaUtils.createDirectStream[String, String]( ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String,String](topics,kafkaParams) ) else KafkaUtils.createDirectStream[String, String]( ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets) ) //获取当前批次的offset信息 var offsetRanges:Array[OffsetRange] = Array.empty stream.transform(rdd => { offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd.foreachPartition{ iter => val o: OffsetRange = offsetRanges(TaskContext.get.partitionId) println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}") } rdd }).map(x => (x.key(),x.value()) ).foreachRDD(rdd => { rdd(println) //遍历不同分区的offset信息,并更新在MySQL中 offsetRanges.foreach(x => { DB.autoCommit( implicit session => { SQL("replace into kafka_offset(topic,group_id,partition_id,fromOffset,untilOffset) values (?,?,?,?,?)") .bind(x.topic,"random",x.partition,x.fromOffset,x.untilOffset) .update().apply() }) }) } ) ssc.start() ssc.awaitTermination() }}
转载地址:http://iyhli.baihongyu.com/