Erlo

go 操作kafka包 sarama (Producer流程)

2021-04-20 19:01:30 发布   717 浏览  
页面报错/反馈
收藏 点赞

go 操作kafka包 sarama (Producer流程)

代码:

package main

import (
	"github.com/Shopify/sarama"
	"log"
	"os"
	"os/signal"
	"sync"
	"time"
)


func main() {
	var (
		wg sync.WaitGroup
		success_num, error_num int
	)
	config := sarama.NewConfig() // 1
	config.Producer.Return.Successes = true
	config.Producer.Return.Errors = true
	// NewRandomPartitioner NewHashPartitioner NewRoundRobinPartitioner NewManualPartitioner NewReferenceHashPartitioner
	config.Producer.Partitioner = sarama.NewRandomPartitioner

	client,err := sarama.NewClient([]string{"localhost:9192","localhost:9292","localhost:9392"}, config) // 2
	if err != nil {
		panic(err)
	}
	defer client.Close()
	producer, err := sarama.NewAsyncProducerFromClient(client) // 3
	if err != nil {
		panic(err)
	}
	defer producer.AsyncClose()
	wg.Add(1)
	go func() {
		wg.Done()
		// config.Producer.Return.Successes = true 后一定要监听这个chan,默认大小256 如果满了就阻塞掉
		for range producer.Successes() {
			success_num  
		}
	}()
	wg.Add(1)
	go func() {
		wg.Done()
		// config.Producer.Return.Errors = true 后一定要监听这个chan,默认大小256 如果满了就阻塞掉
		for range producer.Errors() {
			error_num  
		}
	}()
	// Trap SIGINT to trigger a graceful shutdown.
	signals := make(chan os.Signal, 1)
	signal.Notify(signals, os.Interrupt)
ProducerLoop:
	for {
		message := &sarama.ProducerMessage{Topic: "my_topic", Value: sarama.StringEncoder("testing 123")} // 4
		select {
		case producer.Input() 

在第一步, 通过sarama.NewConfig() 创建了 *Config 对象。 内部 是给很多对象定义了初始值

在第二步,通过 sarama.NewClient()创建 Client对象

func NewClient(addrs []string, conf *Config) (Client, error) {
	
	// 前面对config 进行校验
	// 初始化client
	client := &client{...}
	// 填充client.seedBrokers
	random := rand.New(rand.NewSource(time.Now().UnixNano()))
	for _, index := range random.Perm(len(addrs)) {
		client.seedBrokers = append(client.seedBrokers, NewBroker(addrs[index]))
	}
	// 初始化时为true
	if conf.Metadata.Full {
		// 刷新注册client中的seedBrokers信息,metadata信息
		err := client.RefreshMetadata()
		switch err {
			// 处理异常 
		}
	}
	// 开启后台线程定时刷新metadata信息,直到client关闭
	go withRecover(client.backgroundMetadataUpdater) 
	return client, nil
}

在第三步,通过sarama.NewAsyncProducerFromClient() 创建producer,内部调用newAsyncProducer创建

// Check that we are not dealing with a closed Client before processing any other arguments
	if client.Closed() {
		return nil, ErrClosedClient
	}

	txnmgr, err := newTransactionManager(client.Config(), client)
	if err != nil {
		return nil, err
	}

	p := &asyncProducer{
		client:     client,
		conf:       client.Config(),
		errors:     make(chan *ProducerError),
		input:      make(chan *ProducerMessage),
		successes:  make(chan *ProducerMessage),
		retries:    make(chan *ProducerMessage),
		brokers:    make(map[*Broker]*brokerProducer),
		brokerRefs: make(map[*brokerProducer]int),
		txnmgr:     txnmgr,
	}

	// launch our singleton dispatchers
	go withRecover(p.dispatcher)  // 开启输入chan 
	go withRecover(p.retryHandler)

	return p, nil

p.dispatcher 做的事情

func (p *asyncProducer) dispatcher() {
	handlers := make(map[string]chan

p.newTopicProducer(msg.Topic)

func (p *asyncProducer) newTopicProducer(topic string) chan

tp.dispatch

func (p *asyncProducer) newPartitionProducer(topic string, partition int32) chan

pp.dispatch

func (pp *partitionProducer) dispatch() {
	pp.leader, _ = pp.parent.client.Leader(pp.topic, pp.partition)
	if pp.leader != nil {
		pp.brokerProducer = pp.parent.getBrokerProducer(pp.leader) // 这里开启BrokerProducer其处理消息
		pp.parent.inFlight.Add(1) // we're generating a syn message; track it so we don't shut down while it's still inflight
		pp.brokerProducer.input 

pp.parent.getBrokerProducer(pp.leader)

func (p *asyncProducer) getBrokerProducer(broker *Broker) *brokerProducer {
	p.brokerLock.Lock()
	defer p.brokerLock.Unlock()

	bp := p.brokers[broker]

	if bp == nil {
		bp = p.newBrokerProducer(broker) // 这里开启chan监控去发送消息及获取结果
		p.brokers[broker] = bp
		p.brokerRefs[bp] = 0
	}

	p.brokerRefs[bp]  

	return bp
}
登录查看全部

参与评论

评论留言

还没有评论留言,赶紧来抢楼吧~~

手机查看

返回顶部

给这篇文章打个标签吧~

棒极了 糟糕透顶 好文章 PHP JAVA JS 小程序 Python SEO MySql 确认