如何解决数据处理策略卡夫卡或数据库或任何其他持久性
我正在研究大量的(异步)数据处理策略,在这里我简化了这个问题-
我有一个唱片集-假设-
A-event1
B-event1
B-event2
C-event1
C-event2
C-event3
B-event3
A-event2
A-event3
D-event1
D-event2
C-event4
A-event4
A-event4
A-event6
A-eventfinal
B-eventfinal
C-event6
C-event7
C-eventFinal
D-eventFinal
此记录集的转换将是
A-event1 B-event1 C-event1 D-event1
A-event2 B-event2 C-event2 D-event2
A-event3 B-event3 C-event3 D-eventFinal
A-event4 B-eventfinal C-event4
A-eventFinal C-event5
C-event6
C-event7
C-eventFinal
一旦我获得了最终事件数据,那么只有该集合才可以进行进一步处理。一旦实体进入最终决赛,它就有资格进行进一步处理。该单独的集合现在被发送到第三方应用程序进行处理并且成功完成后,它将返回关闭事件或确认,或者可能是失败。因此,此单独的集合随时可以清除或保留以进行进一步更正(如果失败),为此,可能确认或关闭几天没有收到。所以我必须将这些数据保存在某个地方(例如数据库,Kafka或类似的东西)
我在这里用A,B,C和D作为实体标识符,可能要数万(像guid)。我还需要一种能力来重新处理整个记录集。
我所阐述的几个选择是
- 每个标识符都有一个动态的Kafka主题,但是无论如何它可能会维护10万个主题,我正努力避免使用DB。
- 已经将一个Kafka主题整套放置,并创建了另一个重试主题,这种应用程序X保持了轮询重试主题。
我愿意接受任何数据处理算法,更不用说数据丢失了。
我了解此解释不是很抽象,请让我知道,如果您需要进一步的解释,我们将不胜感激。
我正在寻找一种架构方法。
解决方法
您的描述有点细节。但是,您可以使用数据库和某种管道(选择毒药)轻松解决此问题
在这个非常人为的示例中,我使用了Dataflow,您可以使用任何喜欢的结构或框架,但是问题仍然相同。在该示例中,Dataflow可以轻松完成某些工作。
- 可以使用异步和等待模式。
- 以有序方式处理事物(或没有)
- 可以使用队列进行处理,可以并行处理事物
- 配置最大并行度
- 可以创建永久的管道
- 可以取消令牌以及更多
我不得不做很多假设,并留下了很多想像力。
- 您需要考虑容错能力
- 实施取消制度
- 调整并行度和其他选项
- 为事件实现数据库
- 并具有故障转移和重启机制(如果您的进程出现故障)
示例
public enum EventType
{
Event,Final,Finished,Error
}
public class EventMessage
{
public int GroupId { get; set; }
public int EventId { get; set; }
public string Payload { get; set; }
public EventType EventType { get; set; }
}
public static ConcurrentDictionary<int,List<EventMessage>> _dataStore = new ConcurrentDictionary<int,List<EventMessage>>();
private static BufferBlock<EventMessage> _start;
private static ActionBlock<EventMessage> _persistBlock;
private static ActionBlock<EventMessage> _processBlock;
private static ActionBlock<EventMessage> _finalizeBlock;
private static TransformBlock<EventMessage,EventMessage> _reprocessBlock;
private static TransformBlock<EventMessage,EventMessage> _queue;
private static Random _r = new Random();
static async Task Main(string[] args)
{
// this is just a buffer that can receive asynchronous events
_start = new BufferBlock<EventMessage> (new DataflowBlockOptions(){EnsureOrdered = true});
// we need an orderly queue,the bounded capacity is 1 so we can process events in order
// ie so you don't process the final before all events are recevied
_queue = new TransformBlock<EventMessage,EventMessage>(message => message,new ExecutionDataflowBlockOptions(){BoundedCapacity = 1});
// save your events to the database
_persistBlock = new ActionBlock<EventMessage>(PersistAction,new ExecutionDataflowBlockOptions() { BoundedCapacity = 1 });
// process the final event
_processBlock = new ActionBlock<EventMessage>(ProcessAction);
// process the event from the 3rd party service
_finalizeBlock = new ActionBlock<EventMessage>(FinalizeAction);
// reprocess on failure or whatever you need to do
_reprocessBlock = new TransformBlock<EventMessage,EventMessage>(Reprocess);
// link it all together
_start.LinkTo(_queue);
_queue.LinkTo(_persistBlock,(x) => x.EventType == EventType.Event);
_queue.LinkTo(_processBlock,(x) => x.EventType == EventType.Final);
_queue.LinkTo(_finalizeBlock,(x) => x.EventType == EventType.Finished);
_queue.LinkTo(_reprocessBlock,(x) => x.EventType == EventType.Error);
_reprocessBlock.LinkTo(_start);
// create some events
var tasks= Enumerable.Range(1,5).Select(CreateEvents);
await Task.WhenAll(tasks);
Console.ReadKey();
}
private static async Task CreateEvents(int groupId)
{
var events = Enumerable
.Range(1,_r.Next(2,5))
.Select(x => new EventMessage()
{
GroupId = groupId,EventId = x,EventType = EventType.Event
});
foreach (var e in events)
{
await Task.Delay(_r.Next(10,100));
await _start.SendAsync(e);
}
await _start.SendAsync(new EventMessage()
{
GroupId = groupId,Payload = $"Final Event",EventType = EventType.Final
});
}
private static EventMessage Reprocess(EventMessage e)
{
// the event come back as an error,so we push it back on the the queue
Console.WriteLine($"Reprocessing group : {e.GroupId}");
e.EventType = EventType.Final;
e.Payload = e.Payload + " Error";
return e;
}
private static async Task PersistAction(EventMessage e)
{
// this is simulating saving the event to a db
Console.WriteLine($"Saving event : {e.GroupId}:{e.EventId}");
await Task.Delay(_r.Next(10,100));
_dataStore.AddOrUpdate(e.GroupId,(x) => new List<EventMessage>() {e},(x,l) =>
{
l.Add(e);
return l;
});
}
private static async Task ProcessAction(EventMessage e)
{
// this is simulating reading all the events for that group from the db
// and sending to your 3rd service
Console.WriteLine($"Sending to service : {e.GroupId}");
await Task.Delay(_r.Next(10,100));
// this is simulating receiving a result from the 3rd party service
// just pushes the event back in to the queue,to be finialised or reprocessed
// choose randomly if it was a success or failure
// obviously this would be called by something else,possibly your message queue
if (_r.Next(0,2) == 0)
e.EventType = EventType.Finished;
else
e.EventType = EventType.Error;
Console.WriteLine($"Service returned : {e.GroupId},{e.EventType}");
await _start.SendAsync(e);
}
private static void FinalizeAction(EventMessage e)
{
// pruge the records,we are all done
_dataStore.TryRemove(e.GroupId,out var l);
Console.WriteLine($"*** Finalize : {e.GroupId} - {string.Join(",",l.Select(x => x.EventId))}");
}
输出
Saving event : 4:1
Saving event : 1:1
Saving event : 4:2
Saving event : 1:2
Saving event : 5:1
Saving event : 5:2
Saving event : 3:1
Saving event : 2:1
Saving event : 1:3
Saving event : 5:3
Sending to service : 1
Saving event : 5:4
Service returned : 1,Error
Sending to service : 5
Saving event : 2:2
Service returned : 5,Error
Saving event : 3:2
Saving event : 4:3
Saving event : 4:4
Sending to service : 4
Saving event : 2:3
Service returned : 4,Error
Saving event : 3:3
Sending to service : 3
Saving event : 2:4
Reprocessing group : 1
Reprocessing group : 5
Reprocessing group : 4
Service returned : 3,Error
Sending to service : 2
Reprocessing group : 3
Service returned : 2,Finished
Sending to service : 1
*** Finalize : 2 - 1,2,3,4
Service returned : 1,Finished
Sending to service : 5
*** Finalize : 1 - 1,3
Service returned : 5,Finished
Sending to service : 4
*** Finalize : 5 - 1,4
Service returned : 4,Finished
Sending to service : 3
*** Finalize : 4 - 1,4
Service returned : 3,Error
Reprocessing group : 3
Sending to service : 3
Service returned : 3,Finished
*** Finalize : 3 - 1,3
注意:这仅是一个示例,并不意味着它是完整的解决方案或对数据流的建议,甚至不建议您解决该问题。仅仅是为了让您对结构化管道有所了解。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。