I want to check the mechanism of restarting the connection to the rabbitmq server. I wrote a small script for the test. http://play.golang.org/p/l3ZWzG0Qqb But it does not work.
In step 10, I close the channel and connection. And I open them again. And re-create chan amqp.Confirmation (num: 75). And I continue the cycle. But after that, nothing comes from the channel.
UPD: I post the code here.
package main import ( "fmt" "github.com/streadway/amqp" "log" "os" "time" ) const SERVER = "amqp://user:pass@localhost:5672/" const EXCHANGE_NAME = "publisher.test.1" const EXCHANGE_TYPE = "direct" const ROUTING_KEY = "publisher.test" var Connection *amqp.Connection var Channel *amqp.Channel func setup(url string) (*amqp.Connection, *amqp.Channel, error) { conn, err := amqp.Dial(url) if err != nil { return nil, nil, err } ch, err := conn.Channel() if err != nil { return nil, nil, err } return conn, ch, nil } func main() { url := SERVER Connection, Channel, err := setup(url) if err != nil { fmt.Println("err publisher setup:", err) return } confirms := Channel.NotifyPublish(make(chan amqp.Confirmation, 1)) if err := Channel.Confirm(false); err != nil { log.Fatalf("confirm.select destination: %s", err) } for i := 1; i <= 3000000; i++ { log.Println(i) if err != nil { fmt.Println("err consume:", err) return } if err := Channel.Publish(EXCHANGE_NAME, ROUTING_KEY, false, false, amqp.Publishing{ Body: []byte(fmt.Sprintf("%d", i)), }); err != nil { fmt.Println("err publish:", err) log.Printf("%+v", err) os.Exit(1) return } // only ack the source delivery when the destination acks the publishing confirmed := <-confirms if confirmed.Ack { log.Printf("confirmed delivery with delivery tag: %d", confirmed.DeliveryTag) } else { log.Printf("failed delivery of delivery tag: %d", confirmed.DeliveryTag) // TODO. Reconnect will be here } if i == 10 { Channel.Close() Connection.Close() while := true for while { log.Println("while") time.Sleep(time.Second * 1) Connection, Channel, err = setup(url) if err == nil { while = false confirms = Channel.NotifyPublish(make(chan amqp.Confirmation, 1)) log.Printf("%+v", confirms) } } } time.Sleep(time.Millisecond * 300) } os.Exit(1) }
[РЕШЕНО]and analogs, it's all crutches. We have this provided by the checkbox on the answers. - D-side