kafka

发布时间 2023-07-25 15:58:06作者: 费良
 1         /// <summary>
 2         /// 指定的组别的消费者开始消费指定主题的消息
 3         /// </summary>
 4         /// <param name="broker">Kafka消息服务器的地址</param>
 5         /// <param name="topic">Kafka消息所属的主题</param>
 6         /// <param name="groupID">Kafka消费者所属的组别</param> 
 7         public void Consume(string broker, string topic, string groupID)
 8         {
 9             Task.Run(() =>
10             {
11                 try
12                 {
13                     if (string.IsNullOrEmpty(broker) || string.IsNullOrWhiteSpace(broker) || broker.Length <= 0)
14                     {
15                         throw new ArgumentException("Kafka消息服务器的地址不能为空!");
16                     }
17                     if (string.IsNullOrEmpty(topic) || string.IsNullOrWhiteSpace(topic) || topic.Length <= 0)
18                     {
19                         throw new ArgumentNullException("Kafka消息所属的主题不能为空!");
20                     }
21                     if (string.IsNullOrEmpty(groupID) || string.IsNullOrWhiteSpace(groupID) || groupID.Length <= 0)
22                     {
23                         throw new ArgumentException("用户分组ID不能为空!");
24                     }
25                     var config = new ConsumerConfig
26                     {
27                         GroupId = groupID,
28                         BootstrapServers = broker,
29                         SecurityProtocol = SecurityProtocol.SaslPlaintext,
30                         SaslMechanism = SaslMechanism.Plain,
31                         StatisticsIntervalMs = 5000,
32                         EnableAutoCommit = false,      //手动提交 
33                         //EnableAutoCommit = true,     //自动提交
34                         //AutoCommitIntervalMs = 5000,   //自动提交偏移量的间隔时间,默认值为 5,000 毫秒(即5秒)
35                         HeartbeatIntervalMs = 3000,    //消费者发送心跳的间隔时间,默认值为 3,000 毫秒(即3秒)
36                         SessionTimeoutMs = 10000,      //消费者与 Kafka broker 之间的会话超时时间,默认值为 10,000 毫秒(即10秒)
37                         MaxPollIntervalMs = 300000,    //两次 poll 调用之间的最大时间间隔,默认值为 300,000 毫秒(即5分钟)
38                         AutoOffsetReset = AutoOffsetReset.Latest,     //最新消息
39                         //AutoOffsetReset = AutoOffsetReset.Earliest,
40                         EnablePartitionEof = true,
41                         PartitionAssignmentStrategy = PartitionAssignmentStrategy.CooperativeSticky,
42                         SaslUsername = "*****",                //SASL账户 
43                         SaslPassword = "***************",  //SASL密码 
44                     };
45 
46                     using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
47                     {
48                         consumer.Subscribe(topic);  //订阅主题 
49                         while (IsCancelled)         //循环消费
50                         {
51                             WriteLog("KafkaLog", "Kafka Consumer Starting...");
52                             OnKafkaEventHandler("Kafka Consumer Starting...");
53                             var consumerResult = consumer.Consume();         //消费Kafka  
54                             //var consumerResult = consumer.Consume(60000);  //消费Kafka  没有消息时设置超时时间
55 
56                             if (consumerResult.Message != null)
57                             {
58                                 var key = consumerResult.Message.Key;
59                                 var message = consumerResult.Message.Value;
60 
61                                 //显示Kafka数据
62                                 //OnKafkaEventHandler(message); 
63 
64                                 //记录Log 
65                                 WriteLog("KafkaLog", "Kafka Consumer Succeed");
66                                 OnKafkaEventHandler("Kafka Consumer Succeed");
67 
68                                 //插入数据库
69                                 InsertOracleDataBase(message);
70 
71                                 //手动提交
72                                 consumer.Commit(consumerResult);
73                                 WriteLog("KafkaLog", "Kafka Commit Succeed");
74                                 OnKafkaEventHandler("Kafka Commit Succeed");
75                             }
76                             else
77                             {
78                                 WriteLog("KafkaLog", "Kafka Consumer NoData");
79                                 OnKafkaEventHandler("Kafka Consumer NoData");
80                             }
81                         }
82                     }
83                 }
84                 catch (Exception ex)
85                 {
86                     IsCancelled = false;
87                     WriteLog("KafkaLog", ex.Message);
88                     OnKafkaEventHandler(ex.Message);
89                 }
90             });
91         }