代码:
package main
import (
"github.com/Shopify/sarama"
"log"
"os"
"os/signal"
"sync"
"time"
)
func main() {
var (
wg sync.WaitGroup
success_num, error_num int
)
config := sarama.NewConfig() // 1
config.Producer.Return.Successes = true
config.Producer.Return.Errors = true
// NewRandomPartitioner NewHashPartitioner NewRoundRobinPartitioner NewManualPartitioner NewReferenceHashPartitioner
config.Producer.Partitioner = sarama.NewRandomPartitioner
client,err := sarama.NewClient([]string{"localhost:9192","localhost:9292","localhost:9392"}, config) // 2
if err != nil {
panic(err)
}
defer client.Close()
producer, err := sarama.NewAsyncProducerFromClient(client) // 3
if err != nil {
panic(err)
}
defer producer.AsyncClose()
wg.Add(1)
go func() {
wg.Done()
// config.Producer.Return.Successes = true 后一定要监听这个chan,默认大小256 如果满了就阻塞掉
for range producer.Successes() {
success_num
}
}()
wg.Add(1)
go func() {
wg.Done()
// config.Producer.Return.Errors = true 后一定要监听这个chan,默认大小256 如果满了就阻塞掉
for range producer.Errors() {
error_num
}
}()
// Trap SIGINT to trigger a graceful shutdown.
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
ProducerLoop:
for {
message := &sarama.ProducerMessage{Topic: "my_topic", Value: sarama.StringEncoder("testing 123")} // 4
select {
case producer.Input()
在第一步, 通过sarama.NewConfig() 创建了 *Config 对象。 内部 是给很多对象定义了初始值
在第二步,通过 sarama.NewClient()创建 Client对象
func NewClient(addrs []string, conf *Config) (Client, error) {
// 前面对config 进行校验
// 初始化client
client := &client{...}
// 填充client.seedBrokers
random := rand.New(rand.NewSource(time.Now().UnixNano()))
for _, index := range random.Perm(len(addrs)) {
client.seedBrokers = append(client.seedBrokers, NewBroker(addrs[index]))
}
// 初始化时为true
if conf.Metadata.Full {
// 刷新注册client中的seedBrokers信息,metadata信息
err := client.RefreshMetadata()
switch err {
// 处理异常
}
}
// 开启后台线程定时刷新metadata信息,直到client关闭
go withRecover(client.backgroundMetadataUpdater)
return client, nil
}
在第三步,通过sarama.NewAsyncProducerFromClient() 创建producer,内部调用newAsyncProducer创建
// Check that we are not dealing with a closed Client before processing any other arguments
if client.Closed() {
return nil, ErrClosedClient
}
txnmgr, err := newTransactionManager(client.Config(), client)
if err != nil {
return nil, err
}
p := &asyncProducer{
client: client,
conf: client.Config(),
errors: make(chan *ProducerError),
input: make(chan *ProducerMessage),
successes: make(chan *ProducerMessage),
retries: make(chan *ProducerMessage),
brokers: make(map[*Broker]*brokerProducer),
brokerRefs: make(map[*brokerProducer]int),
txnmgr: txnmgr,
}
// launch our singleton dispatchers
go withRecover(p.dispatcher) // 开启输入chan
go withRecover(p.retryHandler)
return p, nil
p.dispatcher 做的事情
func (p *asyncProducer) dispatcher() {
handlers := make(map[string]chan
p.newTopicProducer(msg.Topic)
func (p *asyncProducer) newTopicProducer(topic string) chan
tp.dispatch
func (p *asyncProducer) newPartitionProducer(topic string, partition int32) chan
pp.dispatch
func (pp *partitionProducer) dispatch() {
pp.leader, _ = pp.parent.client.Leader(pp.topic, pp.partition)
if pp.leader != nil {
pp.brokerProducer = pp.parent.getBrokerProducer(pp.leader) // 这里开启BrokerProducer其处理消息
pp.parent.inFlight.Add(1) // we're generating a syn message; track it so we don't shut down while it's still inflight
pp.brokerProducer.input
pp.parent.getBrokerProducer(pp.leader)
func (p *asyncProducer) getBrokerProducer(broker *Broker) *brokerProducer {
p.brokerLock.Lock()
defer p.brokerLock.Unlock()
bp := p.brokers[broker]
if bp == nil {
bp = p.newBrokerProducer(broker) // 这里开启chan监控去发送消息及获取结果
p.brokers[broker] = bp
p.brokerRefs[bp] = 0
}
p.brokerRefs[bp]
return bp
}
登录查看全部
参与评论
手机查看
返回顶部