如何解决使用CDC的Mulesoft与Salesforce Streaming API
我正在研究Mule API流程,以测试Salesforce事件流。我已经设置好连接器并订阅了流媒体频道。
当我创建/更新/删除联系人记录,事件通过并通过将它们添加到另一个数据库进行处理时,此方法就很好了。
我对Border
功能有些困惑。使用当前设置,我可以关闭Mule应用程序,在组织中创建联系人,然后当我使该应用程序重新联机时,它会通过从其上次中断的地方添加数据来恢复。完美。
但是,我正在尝试模拟如果在处理事件时the子应用程序崩溃会发生什么情况。
我运行了一些APEX来创建100条随机联系记录。一旦看到它记录了我应用程序中的第一个流,我就杀死了kill子应用程序。我在这里的假设是,当我恢复应用程序时,它会知道它从何处退出,就像在上一次测试中那样,在创建联系人之前是脱机的。
我注意到的是,在我关闭该应用程序之前,它仅处理通过它的几个联系人。
看来,事件可能在流输入中是如此之快,以至于它已经到达流中的最后<ListView x:Name="SearchFields" Grid.Row="0" Grid.Column="0" ItemsSource="{Binding CustomerSearchFields}" SelectedItem="{Binding selectedSearchField,UpdateSourceTrigger=LostFocus}" Style="{StaticResource MaterialDropShadowStyle}"
BorderThickness="0,2,0" Padding="24,24,0" HorizontalContentAlignment="Stretch" helper:EnterKeyTraversal.IsEnabled="True" KeyboardNavigation.TabNavigation="Cycle" FontFamily="{StaticResource DefaultFontFamily}"
Background="{StaticResource ColorLightGray2}">
<ListView.Resources>
<DataTemplate DataType="{x:Type model:CustomerSeachFieldViewModel}">
<Grid>
<Grid.RowDefinitions>
<RowDefinition Height="*" />
<RowDefinition Height="48" />
</Grid.RowDefinitions>
<TextBlock Grid.Row="0" Text="{Binding Description}" FontSize="{StaticResource FontSizeSmall}" FontWeight="SemiBold" Foreground="{StaticResource ColorDarkGray}" Margin="0,4" />
<Border x:Name="PART_Border" Grid.Row="1" BorderThickness="1" BorderBrush="{StaticResource ColorGray}">
<Grid Background="White">
<Grid.ColumnDefinitions>
<ColumnDefinition Width="*" />
<ColumnDefinition Width="48" />
</Grid.ColumnDefinitions>
<TextBox Grid.Column="0" Text="{Binding SearchText}" VerticalAlignment="Stretch" HorizontalAlignment="Stretch" FontSize="{StaticResource FontSizeNormal}" Padding="12,15" BorderThickness="0" />
<Border Grid.Column="1" Background="{StaticResource ColorLightGray2}" Margin="8">
<ctrl:IconViewbox IconData="{StaticResource IconPathSearch}" IconSize="16" IsTabStop="False" />
</Border>
</Grid>
</Border>
</Grid>
</DataTemplate>
</ListView.Resources>
<ListView.ItemContainerStyle>
<Style TargetType="ListViewItem">
<Setter Property="IsTabStop" Value="False" />
<Setter Property="Margin" Value="0,24" />
<Setter Property="Template">
<Setter.Value>
<ControlTemplate>
<ContentPresenter Content="{Binding}" />
</ControlTemplate>
</Setter.Value>
</Setter>
<Style.Triggers>
<Trigger Property="IsKeyboardFocusWithin" Value="True">
<Setter Property="IsSelected" Value="True" />
</Trigger>
<Trigger Property="IsSelected" Value="True">
<Trigger.Setters>
<Setter Property="BorderBrush" Value="Fuchsia" />
</Trigger.Setters>
</Trigger>
</Style.Triggers>
</Style>
</ListView.ItemContainerStyle>
</ListView>
。但是,由于这些记录仍未添加到我的外部数据库中,因此我丢失了这些记录。流完成了应做的工作,但是由于该应用仍在处理大量工作,因此我的100条记录没有像replayId
那样提交。
如何解决此问题,以免在应用程序崩溃前如果有大量数据流而不会丢失数据?我记得使用Kafka时,一旦将ID插入数据库,您就必须能够replayId
,以便它知道您正式处理的最后一个ID。在Mule中有这样一个概念,我可以告诉它我正式退出并致力于DB的位置吗?
解决方法
协议(CometD)级别的可靠性意味着许多属性。其中最主要的是订户已接收到的消息的事务性ACK(通知)。 CometD支持ACK作为扩展。 Salesforce的CometD实施不支持ACK。即使这样做,您仍然有issues ...但是风险发生的频率/损失可能会更低。
对于您而言,您必须设计一种解决方案,该解决方案相当于查找和重放未提交给目标数据库的事件。您可以使用Mule中的自定义代码或接线适配器来执行此操作。重播ID值不能保证在连续事件中是连续的,但是将对其进行排序。重播ID为100的事件A之后是重播ID为200的事件B。
您将需要在数据库中存储重播ID值。然后,您可以在重新订阅(订户失败后)时使用它来从SF中检索数据库中缺少的事件。仅当故障窗口足够小时,这才起作用。对于标准平台事件许可证,Salesforce事件保留窗口当前为24小时。较高级别的许可证可以保留更长的时间。
根据数据量,事件的发生频率和其他过程参数,可以使用Heroku Connect来开箱即用。它的确暗示了有关Heroku的Postgres DB + HC的许可成本和运营成本,但是我们在类似情况下的大多数客户都认为值得。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。