如何解决我想在一个 .net5 项目中使用两个不同的 kafka 主题
我有一个 .net5 项目并编写代码来使用不同的 kafka 主题。
当我只消费一个主题时一切正常 - 一个组,但如果我放置另一个主题来消费它,只有第一个主题组被消费,而另一个不是。
我的代码就是这样;
Startup.cs
public void ConfigureServices(IServiceCollection services)
{
services.AddMvc().AddMvcOptions(o =>
{
o.EnableEndpointRouting = false;
});
services.AddSingleton<IHostedService,ActivateMovieConsumerHandler>();
services.AddSingleton<IHostedService,DeactivateMovieConsumerHandler>();
services.AddElasticsearch(Configuration);
services.AddScoped<IElasticService,ElasticService>();
}
激活的第一个消费者处理程序代码
using System;
using System.Threading;
using System.Threading.Tasks;
using Confluent.Kafka;
using Logic.Business.Service.Interfaces;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Hosting;
namespace Logic.Business.Service.Kafka.Handlers
{
public class ActivateMovieConsumerHandler : IHostedService
{
private readonly IConfiguration _configuration;
private readonly IElasticService _elasticService;
public ActivateMovieConsumerHandler(IConfiguration configuration,IElasticService elasticService)
{
_configuration = configuration;
_elasticService = elasticService;
}
public Task StartAsync(CancellationToken cancellationToken)
{
var conf = new ConsumerConfig
{
GroupId = _configuration["MovieActivatedGroupName"],BootstrapServers = _configuration["KafkaBootstrapServers"],AutoOffsetReset = AutoOffsetReset.Earliest
};
using (var builder = new ConsumerBuilder<Ignore,string>(conf).Build())
{
builder.Subscribe(_configuration["MovieActivatedTopicName"]);
var cancelToken = new CancellationTokenSource();
while (true)
{
var consumer = builder.Consume(cancelToken.Token);
try
{
_elasticService.InsertMovie(consumer.Message.Value);
}
catch (Exception exception)
{
builder.Close();
}
}
}
return Task.CompletedTask;
}
public Task StopAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
}
}
以及用于停用的第二个消费者处理程序
using System;
using System.Threading;
using System.Threading.Tasks;
using Confluent.Kafka;
using Logic.Business.Service.Interfaces;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Hosting;
namespace Logic.Business.Service.Kafka.Handlers
{
public class DeactivateMovieConsumerHandler : IHostedService
{
private readonly IConfiguration _configuration;
private readonly IElasticService _elasticService;
public DeactivateMovieConsumerHandler(IConfiguration configuration,IElasticService elasticService)
{
_configuration = configuration;
_elasticService = elasticService;
}
public Task StartAsync(CancellationToken cancellationToken)
{
var conf = new ConsumerConfig
{
GroupId = _configuration["MovieDeactivatedGroupName"],string>(conf).Build())
{
builder.Subscribe(_configuration["MovieDeactivatedTopicName"]);
var cancelToken = new CancellationTokenSource();
while (true)
{
var consumer = builder.Consume(cancelToken.Token);
try
{
_elasticService.DeleteMovie(consumer.Message.Value);
}
catch (Exception exception)
{
builder.Close();
}
}
}
return Task.CompletedTask;
}
public Task StopAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
}
}
如上startup.cs中所见;我将两个依赖项用于激活和停用,但代码仅针对第一个处理程序运行,其他未在侦听程序
预先感谢您的帮助
解决方法
我解决了为新消费者添加新线程的问题
我的最终代码如下
using System;
using System.Threading;
using System.Threading.Tasks;
using Confluent.Kafka;
using Logic.Business.Service.Interfaces;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Hosting;
namespace Logic.Business.Service.Kafka.Handlers
{
public class MovieConsumerHandler : IHostedService
{
private readonly IConfiguration _configuration;
private readonly IElasticService _elasticService;
public MovieConsumerHandler(IConfiguration configuration,IElasticService elasticService)
{
_configuration = configuration;
_elasticService = elasticService;
}
public Task StartAsync(CancellationToken cancellationToken)
{
Thread activatedThread = new Thread(StartActivatedConsumer)
{
Name = "ActivatedThread"
};
Thread deactivatedThread = new Thread(StartDeactivatedConsumer)
{
Name = "DeactivatedThread"
};
activatedThread.Start();
deactivatedThread.Start();
return Task.CompletedTask;
}
private void StartActivatedConsumer()
{
var conf = new ConsumerConfig
{
GroupId = _configuration["MovieActivatedGroupName"],BootstrapServers = _configuration["KafkaBootstrapServers"],AutoOffsetReset = AutoOffsetReset.Earliest
};
using (var builder = new ConsumerBuilder<Ignore,string>(conf).Build())
{
builder.Subscribe(_configuration["MovieActivatedTopicName"]);
var cancelToken = new CancellationTokenSource();
while (true)
{
var consumer = builder.Consume(cancelToken.Token);
try
{
_elasticService.InsertMovie(consumer.Message.Value);
}
catch (Exception exception)
{
builder.Close();
}
}
}
}
private void StartDeactivatedConsumer()
{
var conf = new ConsumerConfig
{
GroupId = _configuration["MovieDeactivatedGroupName"],string>(conf).Build())
{
builder.Subscribe(_configuration["MovieDeactivatedTopicName"]);
var cancelToken = new CancellationTokenSource();
while (true)
{
var consumer = builder.Consume(cancelToken.Token);
try
{
_elasticService.DeleteMovie(consumer.Message.Value);
}
catch (Exception exception)
{
builder.Close();
}
}
}
}
public Task StopAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
}
}
在 .net 中,如果我们不标记另一个线程,则一个线程已启动,到目前为止,我创建了一个新线程,然后使用了另一个主题
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。