如何解决写入Cassandra时防止种族状况
我有一个实时流传输解决方案,其中有 Kafka , Spark (作为聚合引擎)和 Cassandra (作为商店)。用户定义所需的聚合,然后引擎创建该聚合并将其写入商店。这是一个如何创建聚合的示例
CREATE AGGR COUNT FROM input_data WHERE type,event,id
这将为3列创建一个 count 汇总并写入C *。
我们也需要处理历史数据。这意味着,如果今天创建了一个汇总,我们需要返回并修复其历史记录。为了满足这种用例,我们在Cassandra中创建了一个 hvalue 列。这是供参考的模式
CREATE TABLE tbl (
key blob,key2 blob,key3 blob,...
key15 blob,column1 blob,column2 blob,...
column20 blob,*hvalue* blob,*value* blob,PRIMARY KEY ((key,key2,key3 ... key15),column1 ... column20)
) WITH CLUSTERING ORDER BY (column1 ASC,column2 ASC .. column20 ASC)
值存储在线处理时计算出的事实。 hvalue 存储用于历史处理的值。查询时,将检索,合并这两个列并将其返回给用户。
我们正在使用datastax leftJoin API与Cassandra联接。
RDD.leftJoinWithCassandraTable(keyspace,tableName)
.on(SomeColumns(...)
.map { case (ip,row) => row match {
case None => ip
case Some(data) => CASSANDRA_MAP_SCHEMA(...)
)
}
}.saveToCassandra(keyspace,tableName)
简而言之,我们为RDD创建一个架构,并将该行写入Cassandra。
现在,这是问题所在。在历史过程中,我们需要创建一行以写入Cassandra。这意味着我们需要向“值”列提供一些数据。如果它是Cassandra中不存在的新行,我们将创建一个空对象并写回。如果该行存在,我们将现有值取回。 在线和历史过程将同时运行。这意味着当历史进程读取一行并回写时,在线进程可能已经创建了同一行。这将导致数据损坏,因为历史进程可能会读取过时的数据并更新在线进程写入的值。 我不确定如何解决此问题。如果有其他解决方案可以防止这种情况,我将不胜感激。 我尽力解释了一切,让我知道是否需要进一步说明,我将尝试添加更多输入。
预先感谢您的帮助。
解决方法
有几种方法可以解决此问题,但是没有一种方法很简单。从根本上来说,写后很难写。
首先,您引入了一种共享的外部锁定机制,在该机制下,您可以为该行获取一个锁定,并在完成该操作后将其释放或保留较短的ttl。您可以为此使用Redis之类的东西。
第二种选择是通过kafka队列将对Cassandra的所有更改集中到一起,以便只允许写入一个源。尽管这有可能使您的问题变得更糟。如果要执行此操作,请确保基于密钥对队列进行分区,以使同一密钥始终路由到同一队列。
第三个选择是,仅允许服务在给定时间范围内对数据进行操作。如果仅允许您的在线数据处理最后一天或X个小时等数据,而您的历史记录仅允许处理超过该时间段的数据,那么几乎没有机会碰到冲突。
第四个选择是接受这是一种可能性,并且它发生的可能性很小,因此这不是问题。如果运行代码的数据中心非常接近(最好与数据库共存),并且您在读写之间的行上没有进行大量处理,那么这可能是一个合理的选择。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。