如何解决Google Pub / Sub消息订购不起作用或将延迟增加到10秒以上?
我正在尝试做一个简化的示例,演示Google Pub / Sub的消息排序功能(https://cloud.google.com/pubsub/docs/ordering)的使用。在这些文档中,为订阅启用消息排序后,
设置了消息订购属性之后,发布/订阅服务以与发布/订阅服务接收消息的顺序相同的订购密钥传递消息。例如,如果发布者使用相同的订购密钥发送两条消息,则发布/订阅服务将首先传递最早的消息。
我用它来编写以下示例:
A
首先,我尝试了该程序,但未在package main
import (
"context"
"log"
"time"
"cloud.google.com/go/pubsub"
uuid "github.com/satori/go.uuid"
)
func main() {
client,err := pubsub.NewClient(context.Background(),"my-project")
if err != nil {
log.Fatalf("NewClient: %v",err)
}
topicID := "test-topic-" + uuid.NewV4().String()
topic,err := client.CreateTopic(context.Background(),topicID)
if err != nil {
log.Fatalf("CreateTopic: %v",err)
}
defer topic.Delete(context.Background())
subID := "test-subscription-" + uuid.NewV4().String()
sub,err := client.CreateSubscription(context.Background(),subID,pubsub.SubscriptionConfig{
Topic: topic,EnableMessageOrdering: true,})
if err != nil {
log.Fatalf("CreateSubscription: %v",err)
}
defer sub.Delete(context.Background())
ctx,cancel := context.WithCancel(context.Background())
defer cancel()
messageReceived := make(chan struct{})
go sub.Receive(ctx,func(ctx context.Context,msg *pubsub.Message) {
log.Printf("Received message with ordering key %s: %s",msg.OrderingKey,msg.Data)
msg.Ack()
messageReceived <- struct{}{}
})
topic.Publish(context.Background(),&pubsub.Message{Data: []byte("Dang1!"),OrderingKey: "foobar"})
topic.Publish(context.Background(),&pubsub.Message{Data: []byte("Dang2!"),OrderingKey: "foobar"})
for i := 0; i < 2; i++ {
select {
case <-messageReceived:
case <-time.After(10 * time.Second):
log.Fatal("Expected to receive a message,but timed out after 10 seconds.")
}
}
}
调用中指定OrderingKey: "foobar"
。这产生了以下输出:
topic.Publish()
换句话说,接收消息的顺序与发布时的顺序不同,在我的用例中,这是不可取的,我想通过指定> go run main.go
2020/08/10 21:40:34 Received message with ordering key : Dang2!
2020/08/10 21:40:34 Received message with ordering key : Dang1!
但是,一旦我在发布调用中添加了OrderingKey
,该程序就会在等待接收发布/订阅消息10秒钟后超时:
OrderingKey
我希望现在首先收到消息> go run main.go
2020/08/10 21:44:36 Expected to receive a message,but timed out after 10 seconds.
exit status 1
,然后再收到Dang1!
,但是我没有收到任何消息。知道为什么这种情况不会发生吗?
解决方法
发布失败,并出现以下错误:Failed to publish: Topic.EnableMessageOrdering=false,but an OrderingKey was set in Message. Please remove the OrderingKey or turn on Topic.EnableMessageOrdering
。
如果您更改发布呼叫以检查错误,则可以看到此信息:
res1 := topic.Publish(context.Background(),&pubsub.Message{Data: []byte("Dang1!"),OrderingKey: "foobar"})
res2 := topic.Publish(context.Background(),&pubsub.Message{Data: []byte("Dang2!"),OrderingKey: "foobar"})
_,err = res1.Get(ctx)
if err != nil {
fmt.Printf("Failed to publish: %v",err)
return
}
_,err = res2.Get(ctx)
if err != nil {
fmt.Printf("Failed to publish: %v",err)
return
}
要修复此问题,请添加一行以启用您主题上的消息排序。您的主题创建如下:
topic,err := client.CreateTopic(context.Background(),topicID)
if err != nil {
log.Fatalf("CreateTopic: %v",err)
}
topic.EnableMessageOrdering = true
defer topic.Delete(context.Background())
,
我独立地提出了与Kamal相同的解决方案,只是想分享完整的修订实施:
package main
import (
"context"
"flag"
"log"
"time"
"cloud.google.com/go/pubsub"
uuid "github.com/satori/go.uuid"
)
var enableMessageOrdering bool
func main() {
flag.BoolVar(&enableMessageOrdering,"enableMessageOrdering",false,"Enable and use Pub/Sub message ordering")
flag.Parse()
client,err := pubsub.NewClient(context.Background(),"fleetsmith-dev")
if err != nil {
log.Fatalf("NewClient: %v",err)
}
topicID := "test-topic-" + uuid.NewV4().String()
topic,topicID)
if err != nil {
log.Fatalf("CreateTopic: %v",err)
}
topic.EnableMessageOrdering = enableMessageOrdering
defer topic.Delete(context.Background())
subID := "test-subscription-" + uuid.NewV4().String()
sub,err := client.CreateSubscription(context.Background(),subID,pubsub.SubscriptionConfig{
Topic: topic,EnableMessageOrdering: enableMessageOrdering,})
if err != nil {
log.Fatalf("CreateSubscription: %v",err)
}
defer sub.Delete(context.Background())
ctx,cancel := context.WithCancel(context.Background())
defer cancel()
messageReceived := make(chan struct{})
go sub.Receive(ctx,func(ctx context.Context,msg *pubsub.Message) {
log.Printf("Received message with ordering key %s: %s",msg.OrderingKey,msg.Data)
msg.Ack()
messageReceived <- struct{}{}
})
msg1,msg2 := &pubsub.Message{Data: []byte("Dang1!")},&pubsub.Message{Data: []byte("Dang2!")}
if enableMessageOrdering {
msg1.OrderingKey,msg2.OrderingKey = "foobar","foobar"
}
publishMessage(topic,msg1)
publishMessage(topic,msg2)
for i := 0; i < 2; i++ {
select {
case <-messageReceived:
case <-time.After(10 * time.Second):
log.Fatal("Expected to receive a message,but timed out after 10 seconds.")
}
}
}
func publishMessage(topic *pubsub.Topic,msg *pubsub.Message) {
publishResult := topic.Publish(context.Background(),msg)
messageID,err := publishResult.Get(context.Background())
if err != nil {
log.Fatalf("Get: %v",err)
}
log.Printf("Published message with ID %s",messageID)
}
在将enableMessageOrdering
标志设置为true
的情况下被调用时,我首先收到Dang1!
,然后收到Dang2!
:
> go run main.go --enableMessageOrdering
2020/08/11 05:38:07 Published message with ID 1420685949616723
2020/08/11 05:38:08 Published message with ID 1420726763302425
2020/08/11 05:38:09 Received message with ordering key foobar: Dang1!
2020/08/11 05:38:11 Received message with ordering key foobar: Dang2!
如果没有它,我会像以前一样以相反的顺序收到它们:
> go run main.go
2020/08/11 05:38:47 Published message with ID 1420687395091051
2020/08/11 05:38:47 Published message with ID 1420693737065665
2020/08/11 05:38:48 Received message with ordering key : Dang2!
2020/08/11 05:38:48 Received message with ordering key : Dang1!
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。