首页 > 编程学习 > golang 将kafka的offset置为最新

golang 将kafka的offset置为最新

发布时间:2022/6/28 20:17:23

需要解决:

当需要用同一个group_id去消费kafka的partition时,如果程序down掉,可能存在已经消费的数据尚未提交的可能,此时会造成重复消费的问题,且在重启这段时间会产生新的数据,重启这段时间的kafka消息不想再消费。

采用方案:

1、创建consumer时将offset设置为最新

import (
	"github.com/Shopify/sarama"
	cluster "github.com/bsm/sarama-cluster"
)
 
config := cluster.NewConfig()
config.Version = sarama.V1_1_0_0
config.Consumer.Offsets.Initial = sarama.OffsetNewest
config.Consumer.Return.Errors = true
config.Group.Return.Notifications = true
 
// init consumer
consumer, err := cluster.NewConsumer(brokerAddrs, groupID, topics, config)

结果:没有达到期望的效果。(使用sarama.OffsetNewest设置offset位置,得到的结果是从上一次消费完之后的位置开始,程序down掉中间产生的数据也被消费了。)

 

2、创建的consumer重置offset

import (
	"fmt"
	"github.com/confluentinc/confluent-kafka-go/kafka"
	"os"
	"os/signal"
	"syscall"
)
 
func main() {
	broker := "kafka.in.netwa.cn:9092" 
	group := "my_test"                 
	topics := []string{"MyTopic"}      
 
	sigchan := make(chan os.Signal, 1)
	signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
 
	c, err := kafka.NewConsumer(&kafka.ConfigMap{
		"bootstrap.servers":               broker,
		"group.id":                        group,
		"session.timeout.ms":              30000,
		"go.events.channel.enable":        true,
		"go.application.rebalance.enable": true,
		"default.topic.config":            kafka.ConfigMap{"auto.offset.reset": "latest"}})
 
	if err != nil {
		fmt.Fprintf(os.Stderr, "Failed to create consumer: %s\n", err)
		os.Exit(1)
	}
	fmt.Printf("Created Consumer %v\n", c)
	err = c.SubscribeTopics(topics, nil)
	if err != nil {
		fmt.Fprintf(os.Stderr, "设置topic失败: %s\n", err)
		os.Exit(1)
	}
	
	low, high, err := c.QueryWatermarkOffsets("ImTopic", 0, -1)
	if err != nil {
		fmt.Fprintf(os.Stderr, "查询偏移失败: %s\n", err)
		os.Exit(1)
	}
	fmt.Printf("%d---%d: %+v\n", low, high, err)
 
	run := true
	for run == true {
		select {
		case sig := <-sigchan:
			fmt.Printf("Caught signal %v: terminating\n", sig)
			run = false
 
		case ev := <-c.Events():
			switch e := ev.(type) {
			case kafka.AssignedPartitions:
				fmt.Fprintf(os.Stderr, "%% %v\n", e)
				for i, _ := range e.Partitions {
					if *e.Partitions[i].Topic == "MyTopic" {
						e.Partitions[i].Offset = kafka.Offset(high)
					}
				}
				fmt.Fprintf(os.Stderr, "%% %v\n", e)
				c.Assign(e.Partitions)
			case kafka.RevokedPartitions:
				fmt.Fprintf(os.Stderr, "%% %v\n", e)
				c.Unassign()
			case *kafka.Message:
				fmt.Printf("%% Message on %s:\n%s\n",
					e.TopicPartition, string(e.Value))
			case kafka.PartitionEOF:
				fmt.Printf("%% Reached %v\n", e)
			case kafka.Error:
				fmt.Fprintf(os.Stderr, "%% Error: %v\n", e)
				run = false
			}
		}
	}
 
	fmt.Printf("Closing consumer\n")
	c.Close()
}

结论:消费到了kafka最新的消息。(事件kafka.AssignedPartitions发生时,将partition的偏移设置为kafka消息的最新偏移e.Partitions[i].Offset = kafka.Offset(high)。)

————————————————
版权声明:本文为CSDN博主「持成」的原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/u011677067/article/details/81026314

Copyright © 2010-2022 ngui.cc 版权所有 |关于我们| 联系方式| 豫B2-20100000