I want to check the mechanism of restarting the connection to the rabbitmq server. I wrote a small script for the test. http://play.golang.org/p/l3ZWzG0Qqb But it does not work.

In step 10, I close the channel and connection. And I open them again. And re-create chan amqp.Confirmation (num: 75). And I continue the cycle. But after that, nothing comes from the channel.

UPD: I post the code here.

 package main import ( "fmt" "github.com/streadway/amqp" "log" "os" "time" ) const SERVER = "amqp://user:pass@localhost:5672/" const EXCHANGE_NAME = "publisher.test.1" const EXCHANGE_TYPE = "direct" const ROUTING_KEY = "publisher.test" var Connection *amqp.Connection var Channel *amqp.Channel func setup(url string) (*amqp.Connection, *amqp.Channel, error) { conn, err := amqp.Dial(url) if err != nil { return nil, nil, err } ch, err := conn.Channel() if err != nil { return nil, nil, err } return conn, ch, nil } func main() { url := SERVER Connection, Channel, err := setup(url) if err != nil { fmt.Println("err publisher setup:", err) return } confirms := Channel.NotifyPublish(make(chan amqp.Confirmation, 1)) if err := Channel.Confirm(false); err != nil { log.Fatalf("confirm.select destination: %s", err) } for i := 1; i <= 3000000; i++ { log.Println(i) if err != nil { fmt.Println("err consume:", err) return } if err := Channel.Publish(EXCHANGE_NAME, ROUTING_KEY, false, false, amqp.Publishing{ Body: []byte(fmt.Sprintf("%d", i)), }); err != nil { fmt.Println("err publish:", err) log.Printf("%+v", err) os.Exit(1) return } // only ack the source delivery when the destination acks the publishing confirmed := <-confirms if confirmed.Ack { log.Printf("confirmed delivery with delivery tag: %d", confirmed.DeliveryTag) } else { log.Printf("failed delivery of delivery tag: %d", confirmed.DeliveryTag) // TODO. Reconnect will be here } if i == 10 { Channel.Close() Connection.Close() while := true for while { log.Println("while") time.Sleep(time.Second * 1) Connection, Channel, err = setup(url) if err == nil { while = false confirms = Channel.NotifyPublish(make(chan amqp.Confirmation, 1)) log.Printf("%+v", confirms) } } } time.Sleep(time.Millisecond * 300) } os.Exit(1) } 
  • Put the code here. - mals
  • Something I do not see QueueDeclare and ExchangeDeclare. - mals
  • Yes. There is no declaration of queue and exchange, as this does not affect the operation of the code. Messages push and just exchangeName and routingKey. The question is how to make reconnect. There is some unintelligible example in the documentation. Which do not use chan confirmation. And without this chan'a exchange does not check whether he really started the message. And it continues to push, even if the network is broken. (tested by experience) - oke11o
  • No need to hang all kinds of [РЕШЕНО] and analogs, it's all crutches. We have this provided by the checkbox on the answers. - D-side

1 answer 1

Solved.

When reopening the notification channel, you also had to call the Channel.Confirm(false) method

 confirms := Channel.NotifyPublish(make(chan amqp.Confirmation, 1)) if err := Channel.Confirm(false); err != nil { log.Fatalf("confirm.select destination: %s", err) }