如何解决在Node JS应用中使用Kafka Consumer来表示已进行计算
所以我的问题可能涉及基于应用程序性质的一些头脑风暴。
我有一个Node JS应用程序,可以将消息发送到Kafka。例如,用户每次单击页面时,Kafka应用程序都会基于访问量运行计算。然后,在同一实例中,我想通过我的Kafka消息触发计算后检索该计算。到目前为止,此计算已存储在Cassandra数据库中。问题是,如果我们尝试在计算完成之前从Cassandra中读取数据,那么我们将不会从数据库中查询任何内容(尚未插入键),并且不会返回任何内容(错误),或者计算可能已过期。到目前为止,这是我的代码。
router.get('/:slug',async (req,res) =>{
Producer = kafka.Producer
KeyedMessage = kafka.KeyedMessage
client = new kafka.KafkaClient()
producer = new Producer(client)
km = new KeyedMessage('key','message')
kafka_message = JSON.stringify({ id: req.session.session_id.toString(),url: arbitrary_url })
payloads = [
{ topic: 'MakeComputationTopic',messages: kafka_message}
];
const clientCass = new cassandra.Client({
contactPoints: ['127.0.0.1:9042'],localDataCenter: 'datacenter1',// here is the change required
keyspace: 'computation_space',authProvider: new auth.PlainTextAuthProvider('cassandra','cassandra')
});
const query = 'SELECT * FROM computation WHERE id = ?';
clientCass.execute(query,[req.session.session_id],{ hints : ['int'] })
.then(result => console.log('User with email %s',result.rows[0].computations))
.catch((message) => {
console.log('Could not find key')
});
}
首先,我想到了异步和等待,但这是可以排除的,因为这不会停止过时的计算。
其次,我研究了让应用程序进入睡眠状态,但是这种方式似乎会使我的应用程序变慢。
我可能决定使用Kafka Consumer(在我的node-js中)消耗一条消息,该消息表明现在可以安全地查看Cassandra表。
例如(使用kafka-node)
consumer.on('message',function (message) {
clientCass.execute(query,{ hints : ['int'] })
.then(result => console.log('User with computation%s',result.rows[0].computations))
.catch((message) => {
console.log('Could not find key')
});
});
这种方法似乎更好一些,因为每次用户单击页面时我都必须让消费者使用,而我只关心向它发送1条消息。
我想知道如何应对这一挑战?我可能错过了一个场景,还是有一种方法可以使用kafka-node解决此问题?我也在考虑做一个while循环,等待承诺成功,并且计算不是过时的(比较缓存中的值)
解决方法
这种方法似乎更好一些,因为每次用户单击页面时我都必须让消费者使用,而我只关心向它发送1条消息。
我会得出相同的结论。 Cassandra并非针对此类用例而设计。数据库最终是一致的。如果您将某物混在一起,当前的方法现在可能会起作用,但是一旦有了Cassandra集群,肯定会导致不确定的行为。尤其是当您更新条目时。
计算表中的ID是您的分区键。这意味着一旦有了集群,Cassandra就会通过id分发数据。看起来它仅包含一行。这是对Cassandra表建模的非常无效的方法。
您的用例看起来像一个用于会话存储或缓存的用例。 Redis或LevelDB非常适合此类用例。任何其他键值存储也可以完成这项工作。
为什么不将结果写入另一个主题,而又有另一个应用程序读取该主题并将结果写入数据库。这样您就无需保留任何状态。完成后,结果将在主题中。看起来像这样:
传入数据->第一个kafka主题->计算应用程序->第二个kafka主题->另一个将其写入数据库的应用程序
如果它在那里,那么还没有完成。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。