如何解决是否有最佳实践来处理并发环境中跨多个通道的 RabbitMq 发布者确认?
我在 Asp.net API 应用程序中使用 Rabbitmq,并且已经实现了发布者确认模式。存储在 ConcurrentDictionary 中的通道和相应主体的序列号。从长远来看,我想使用多个渠道来提高性能。
我用 DefaultObjectPool 和 IPooledObjectPolicy 接口尝试了这个。这给了我一个频道,所以我尝试在每个线程上分配多个频道。 现在我遇到的问题是,我将各个通道的序列号存储在我的 ConcurrentDictionary 中,并且序列号和各个通道之间没有映射。
是否有在并发环境中跨多个渠道处理发布者确认的最佳做法?
池化对象:
public class RabbitModelPooledObjectPolicy : IPooledObjectPolicy<IModel>
{
private readonly ILogger<RabbitModelPooledObjectPolicy> _logger;
private readonly IOptions<RabbitMqConnectionSettings> _connectionSettings;
private readonly IConnection _connection;
public RabbitModelPooledObjectPolicy(ILogger<RabbitModelPooledObjectPolicy> logger,IOptions<RabbitMqConnectionSettings> connectionSettings)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_connectionSettings = connectionSettings ?? throw new ArgumentNullException(nameof(connectionSettings));
_connection = GetConnection();
}
public IModel Create()
{
return _connection.CreateModel();
}
public bool Return(IModel obj)
{
if (obj.IsOpen)
return true;
obj?.Dispose();
return false;
}
private IConnection GetConnection()
{
try
{
var conFactory = new ConnectionFactory
{
HostName = _connectionSettings.Value.HostName,UserName = _connectionSettings.Value.UserName,Password = _connectionSettings.Value.Password,VirtualHost = _connectionSettings.Value.VirtualHost,DispatchConsumersAsync = _connectionSettings.Value.DispatchConsumersAsync,AutomaticRecoveryEnabled = _connectionSettings.Value.AutomaticRecoveryEnabled
};
if (_connectionSettings.Value.Port != null)
conFactory.Port = _connectionSettings.Value.Port.Value;
return conFactory.CreateConnection();
}
catch (Exception e)
{
_logger.LogError("Error while trying to connect to RabbitMq server",e);
throw;
}
}
}
使用对象池的发布者:
public class Publisher : IPublishRabbitMessages
{
private readonly ILogger<Publisher> _logger;
private readonly IHandlePublisherConfirm _publisherConfirmHandler;
private readonly IConfigureRabbitQueue _queueConfiguration;
private readonly IOptions<RabbitQueueSettings> _queueSettings;
private volatile bool _configuredSuccessfully;
private readonly DefaultObjectPool<IModel> _objectPool;
public Publisher(
ILogger<Publisher> logger,IPooledObjectPolicy<IModel> objectPolicy,IConfigureRabbitQueue queueConfiguration,IOptions<RabbitQueueSettings> queueSettings,IHandlePublisherConfirm publisherConfirmHandler)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_objectPool = new DefaultObjectPool<IModel>(objectPolicy ?? throw new ArgumentNullException(nameof(objectPolicy)),Environment.ProcessorCount * 2);
_queueConfiguration = queueConfiguration ?? throw new ArgumentNullException(nameof(queueConfiguration));
_queueSettings = queueSettings ?? throw new ArgumentNullException(nameof(queueSettings));
_publisherConfirmHandler = publisherConfirmHandler ??
throw new ArgumentNullException(nameof(publisherConfirmHandler));
DoConfigure(_objectPool.Get());
}
public bool ConfiguredSuccessfully => _configuredSuccessfully;
public void BasicPublish<T>(IPublishMessageOptions<T> options) where T : class
{
if (!_configuredSuccessfully)
return;
var channel = _objectPool.Get();
try
{
var properties = channel.CreateBasicProperties();
properties.Persistent = options.Persistent;
var json = JsonConvert.SerializeObject(options.BodyObject);
var body = Encoding.UTF8.GetBytes(json);
_publisherConfirmHandler.AddOutstandingConfirm(channel.NextPublishSeqNo,json);
channel.BasicPublish(options.Exchange,options.RoutingKey,properties,body);
}
catch (Exception e)
{
_logger.LogError("Error while publishing data to rabbit queue",e);
throw;
}
finally
{
_objectPool.Return(channel);
}
}
public void DoConfigure(IModel channel)
{
try
{
_queueConfiguration.CreateQueue(channel,_queueSettings.Value);
channel.ConfirmSelect();
channel.BasicAcks += (sender,ea) =>
_publisherConfirmHandler.CleanOutstandingConfirms(ea.DeliveryTag,ea.Multiple);
channel.BasicNacks += (sender,ea) => _publisherConfirmHandler.HandleNack(ea.DeliveryTag);
_configuredSuccessfully = true;
}
catch (Exception e)
{
_logger.LogError($"{nameof(Publisher)} could not be configured properly with following error message : {e.Message}",e);
throw;
}
finally
{
_objectPool.Return(channel);
}
}
}
以及发布者用于处理确认的发布者确认处理程序:
public class PublisherConfirmHandler : IHandlePublisherConfirm
{
private readonly ILogger<PublisherConfirmHandler> _logger;
protected readonly ConcurrentDictionary<ulong,string> OutstandingConfirms;
public PublisherConfirmHandler(ILogger<PublisherConfirmHandler> logger)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
OutstandingConfirms = new ConcurrentDictionary<ulong,string>();
}
public void CleanOutstandingConfirms(ulong sequenceNumber,bool multiple)
{
if (multiple)
{
var confirmed = OutstandingConfirms.Where(k => k.Key <= sequenceNumber);
foreach (var entry in confirmed)
OutstandingConfirms.TryRemove(entry.Key,out _);
}
else
{
OutstandingConfirms.TryRemove(sequenceNumber,out _);
}
}
public void AddOutstandingConfirm(ulong sequenceNumber,string body)
{
OutstandingConfirms.TryAdd(sequenceNumber,body);
}
public void HandleNack(ulong sequenceNumber)
{
var successful = OutstandingConfirms.TryGetValue(sequenceNumber,out var body);
_logger.LogCritical($"Message with body {body} has been nack-ed. Sequence number : {sequenceNumber}");
}
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。