数据处理策略卡夫卡或数据库或任何其他持久性

如何解决数据处理策略卡夫卡或数据库或任何其他持久性

我正在研究大量的(异步)数据处理策略,在这里我简化了这个问题-

我有一个唱片集-假设-

A-event1
B-event1 
B-event2  
C-event1
C-event2
C-event3
B-event3
A-event2
A-event3
D-event1
D-event2
C-event4
A-event4
A-event4
A-event6
A-eventfinal
B-eventfinal
C-event6
C-event7
C-eventFinal
D-eventFinal

此记录集的转换将是

A-event1      B-event1         C-event1        D-event1
A-event2      B-event2         C-event2        D-event2
A-event3      B-event3         C-event3        D-eventFinal
A-event4      B-eventfinal     C-event4
A-eventFinal                   C-event5
                               C-event6
                               C-event7  
                               C-eventFinal
                   

一旦我获得了最终事件数据,那么只有该集合才可以进行进一步处理。一旦实体进入最终决赛,它就有资格进行进一步处理。该单独的集合现在被发送到第三方应用程序进行处理并且成功完成后,它将返回关闭事件或确认,或者可能是失败。因此,此单独的集合随时可以清除或保留以进行进一步更正(如果失败),为此,可能确认或关闭几天没有收到。所以我必须将这些数据保存在某个地方(例如数据库,Kafka或类似的东西)

我在这里用A,B,C和D作为实体标识符,可能要数万(像guid)。我还需要一种能力来重新处理整个记录集。

我所阐述的几个选择是

  1. 每个标识符都有一个动态的Kafka主题,但是无论如何它可能会维护10万个主题,我正努力避免使用DB。
  2. 已经将一个Kafka主题整套放置,并创建了另一个重试主题,这种应用程序X保持了轮询重试主题。

我愿意接受任何数据处理算法,更不用说数据丢失了。

我了解此解释不是很抽象,请让我知道,如果您需要进一步的解释,我们将不胜感激。

我正在寻找一种架构方法。

解决方法

您的描述有点细节。但是,您可以使用数据库和某种管道(选择毒药)轻松解决此问题

在这个非常人为的示例中,我使用了Dataflow,您可以使用任何喜欢的结构或框架,但是问题仍然相同。在该示例中,Dataflow可以轻松完成某些工作。

  • 可以使用异步和等待模式。
  • 以有序方式处理事物(或没有)
  • 可以使用队列进行处理,可以并行处理事物
  • 配置最大并行度
  • 可以创建永久的管道
  • 可以取消令牌以及更多

我不得不做很多假设,并留下了很多想像力。

  • 您需要考虑容错能力
  • 实施取消制度
  • 调整并行度和其他选项
  • 为事件实现数据库
  • 并具有故障转移和重启机制(如果您的进程出现故障)

示例

public enum EventType
{
   Event,Final,Finished,Error
}

public class EventMessage
{
   public int GroupId { get; set; }
   public int EventId { get; set; }
   public string Payload { get; set; }
   public EventType EventType { get; set; }
}

public static ConcurrentDictionary<int,List<EventMessage>> _dataStore = new ConcurrentDictionary<int,List<EventMessage>>();
private static BufferBlock<EventMessage> _start;
private static ActionBlock<EventMessage> _persistBlock;
private static ActionBlock<EventMessage> _processBlock;
private static ActionBlock<EventMessage> _finalizeBlock;
private static TransformBlock<EventMessage,EventMessage> _reprocessBlock;
private static TransformBlock<EventMessage,EventMessage> _queue;
private static Random _r = new Random();


static async Task Main(string[] args)
{

   // this is just a buffer that can receive asynchronous events
   _start = new BufferBlock<EventMessage> (new DataflowBlockOptions(){EnsureOrdered = true});

   // we need an orderly queue,the bounded capacity is 1 so we can process events in order 
   // ie so you don't process the final before all events are recevied
   _queue = new TransformBlock<EventMessage,EventMessage>(message => message,new ExecutionDataflowBlockOptions(){BoundedCapacity = 1});

   // save your events to the database
   _persistBlock = new ActionBlock<EventMessage>(PersistAction,new ExecutionDataflowBlockOptions() { BoundedCapacity = 1 });

   // process the final event
   _processBlock = new ActionBlock<EventMessage>(ProcessAction);

   // process the event from the 3rd party service
   _finalizeBlock = new ActionBlock<EventMessage>(FinalizeAction);

   // reprocess on failure or whatever you need to do
   _reprocessBlock = new TransformBlock<EventMessage,EventMessage>(Reprocess);

   // link it all together
   _start.LinkTo(_queue);
   _queue.LinkTo(_persistBlock,(x) => x.EventType == EventType.Event);
   _queue.LinkTo(_processBlock,(x) => x.EventType == EventType.Final);
   _queue.LinkTo(_finalizeBlock,(x) => x.EventType == EventType.Finished);
   _queue.LinkTo(_reprocessBlock,(x) => x.EventType == EventType.Error);
   _reprocessBlock.LinkTo(_start);

   // create some events
   var tasks= Enumerable.Range(1,5).Select(CreateEvents);

   await Task.WhenAll(tasks);

   Console.ReadKey();
}

private static async Task CreateEvents(int groupId)
{
   var events = Enumerable
      .Range(1,_r.Next(2,5))
      .Select(x => new EventMessage()
      {
         GroupId = groupId,EventId = x,EventType = EventType.Event
      });
   foreach (var e in events)
   {
      await Task.Delay(_r.Next(10,100));
      await _start.SendAsync(e);
   }

   await _start.SendAsync(new EventMessage()
   {
      GroupId = groupId,Payload = $"Final Event",EventType = EventType.Final
   });
}
private static EventMessage Reprocess(EventMessage e)
{
   // the event come back as an error,so we push it back on the the queue
   Console.WriteLine($"Reprocessing group : {e.GroupId}");
   e.EventType = EventType.Final;
   e.Payload = e.Payload + " Error";
   return e;
}

private static async Task PersistAction(EventMessage e)
{
   // this is simulating saving the event to a db
   Console.WriteLine($"Saving event : {e.GroupId}:{e.EventId}");
   await Task.Delay(_r.Next(10,100));
   _dataStore.AddOrUpdate(e.GroupId,(x) => new List<EventMessage>() {e},(x,l) =>
      {
         l.Add(e);
         return l;
      });
}
private static async Task ProcessAction(EventMessage e)
{
   // this is simulating reading all the events for that group from the db
   // and sending to your 3rd service
   Console.WriteLine($"Sending to service : {e.GroupId}");

   await Task.Delay(_r.Next(10,100));

   // this is simulating receiving a result from the 3rd party service 
   // just pushes the event back in to the queue,to be finialised or reprocessed
   // choose randomly if it was a success or failure
   // obviously this would be called by something else,possibly your message queue
   if (_r.Next(0,2) == 0)
      e.EventType = EventType.Finished;
   else
      e.EventType = EventType.Error;


   Console.WriteLine($"Service returned : {e.GroupId},{e.EventType}");

   await _start.SendAsync(e);
}
private static void FinalizeAction(EventMessage e)
{
 // pruge the records,we are all done
   _dataStore.TryRemove(e.GroupId,out var l);

   Console.WriteLine($"*** Finalize : {e.GroupId} - {string.Join(",",l.Select(x => x.EventId))}");
}

输出

Saving event : 4:1
Saving event : 1:1
Saving event : 4:2
Saving event : 1:2
Saving event : 5:1
Saving event : 5:2
Saving event : 3:1
Saving event : 2:1
Saving event : 1:3
Saving event : 5:3
Sending to service : 1
Saving event : 5:4
Service returned : 1,Error
Sending to service : 5
Saving event : 2:2
Service returned : 5,Error
Saving event : 3:2
Saving event : 4:3
Saving event : 4:4
Sending to service : 4
Saving event : 2:3
Service returned : 4,Error
Saving event : 3:3
Sending to service : 3
Saving event : 2:4
Reprocessing group : 1
Reprocessing group : 5
Reprocessing group : 4
Service returned : 3,Error
Sending to service : 2
Reprocessing group : 3
Service returned : 2,Finished
Sending to service : 1
*** Finalize : 2 - 1,2,3,4
Service returned : 1,Finished
Sending to service : 5
*** Finalize : 1 - 1,3
Service returned : 5,Finished
Sending to service : 4
*** Finalize : 5 - 1,4
Service returned : 4,Finished
Sending to service : 3
*** Finalize  : 4 - 1,4
Service returned : 3,Error
Reprocessing group : 3
Sending to service : 3
Service returned : 3,Finished
*** Finalize : 3 - 1,3

注意:这仅是一个示例,并不意味着它是完整的解决方案或对数据流的建议,甚至不建议您解决该问题。仅仅是为了让您对结构化管道有所了解。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


依赖报错 idea导入项目后依赖报错,解决方案:https://blog.csdn.net/weixin_42420249/article/details/81191861 依赖版本报错:更换其他版本 无法下载依赖可参考:https://blog.csdn.net/weixin_42628809/a
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下 2021-12-03 13:33:33.927 ERROR 7228 [ main] o.s.b.d.LoggingFailureAnalysisReporter : *************************** APPL
错误1:gradle项目控制台输出为乱码 # 解决方案:https://blog.csdn.net/weixin_43501566/article/details/112482302 # 在gradle-wrapper.properties 添加以下内容 org.gradle.jvmargs=-Df
错误还原:在查询的过程中,传入的workType为0时,该条件不起作用 &lt;select id=&quot;xxx&quot;&gt; SELECT di.id, di.name, di.work_type, di.updated... &lt;where&gt; &lt;if test=&qu
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct redisServer’没有名为‘server_cpulist’的成员 redisSetCpuAffinity(server.server_cpulist); ^ server.c: 在函数‘hasActiveC
解决方案1 1、改项目中.idea/workspace.xml配置文件,增加dynamic.classpath参数 2、搜索PropertiesComponent,添加如下 &lt;property name=&quot;dynamic.classpath&quot; value=&quot;tru
删除根组件app.vue中的默认代码后报错:Module Error (from ./node_modules/eslint-loader/index.js): 解决方案:关闭ESlint代码检测,在项目根目录创建vue.config.js,在文件中添加 module.exports = { lin
查看spark默认的python版本 [root@master day27]# pyspark /home/software/spark-2.3.4-bin-hadoop2.7/conf/spark-env.sh: line 2: /usr/local/hadoop/bin/hadoop: No s
使用本地python环境可以成功执行 import pandas as pd import matplotlib.pyplot as plt # 设置字体 plt.rcParams[&#39;font.sans-serif&#39;] = [&#39;SimHei&#39;] # 能正确显示负号 p
错误1:Request method ‘DELETE‘ not supported 错误还原:controller层有一个接口,访问该接口时报错:Request method ‘DELETE‘ not supported 错误原因:没有接收到前端传入的参数,修改为如下 参考 错误2:cannot r
错误1:启动docker镜像时报错:Error response from daemon: driver failed programming external connectivity on endpoint quirky_allen 解决方法:重启docker -&gt; systemctl r
错误1:private field ‘xxx‘ is never assigned 按Altʾnter快捷键,选择第2项 参考:https://blog.csdn.net/shi_hong_fei_hei/article/details/88814070 错误2:启动时报错,不能找到主启动类 #
报错如下,通过源不能下载,最后警告pip需升级版本 Requirement already satisfied: pip in c:\users\ychen\appdata\local\programs\python\python310\lib\site-packages (22.0.4) Coll
错误1:maven打包报错 错误还原:使用maven打包项目时报错如下 [ERROR] Failed to execute goal org.apache.maven.plugins:maven-resources-plugin:3.2.0:resources (default-resources)
错误1:服务调用时报错 服务消费者模块assess通过openFeign调用服务提供者模块hires 如下为服务提供者模块hires的控制层接口 @RestController @RequestMapping(&quot;/hires&quot;) public class FeignControl
错误1:运行项目后报如下错误 解决方案 报错2:Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.1:compile (default-compile) on project sb 解决方案:在pom.
参考 错误原因 过滤器或拦截器在生效时,redisTemplate还没有注入 解决方案:在注入容器时就生效 @Component //项目运行时就注入Spring容器 public class RedisBean { @Resource private RedisTemplate&lt;String
使用vite构建项目报错 C:\Users\ychen\work&gt;npm init @vitejs/app @vitejs/create-app is deprecated, use npm init vite instead C:\Users\ychen\AppData\Local\npm-