-
Notifications
You must be signed in to change notification settings - Fork 13
/
doc.go
71 lines (70 loc) · 2.81 KB
/
doc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
/*
Package nsq is the official Go package for NSQ (http://nsq.io/).
It provides high-level Consumer and Producer types as well as low-level
functions to communicate over the NSQ protocol.
Consumer
Consuming messages from NSQ can be done by creating an instance of a Consumer and supplying it a handler.
type myMessageHandler struct {}
// HandleMessage implements the Handler interface.
func (h *myMessageHandler) HandleMessage(m *nsq.Message) error {
if len(m.Body) == 0 {
// Returning nil will automatically send a FIN command to NSQ to mark the message as processed.
return nil
}
err := processMessage(m.Body)
// Returning a non-nil error will automatically send a REQ command to NSQ to re-queue the message.
return err
}
func main() {
// Instantiate a consumer that will subscribe to the provided channel.
config := nsq.NewConfig()
consumer, err := nsq.NewConsumer("topic", "channel", config)
if err != nil {
log.Fatal(err)
}
// Set the Handler for messages received by this Consumer. Can be called multiple times.
// See also AddConcurrentHandlers.
consumer.AddHandler(&myMessageHandler{})
// Use nsqlookupd to discover nsqd instances.
// See also ConnectToNSQD, ConnectToNSQDs, ConnectToNSQLookupds.
err = consumer.ConnectToNSQLookupd("localhost:4161")
if err != nil {
log.Fatal(err)
}
// Gracefully stop the consumer.
consumer.Stop()
}
Producer
Producing messages can be done by creating an instance of a Producer.
// Instantiate a producer.
config := nsq.NewConfig()
producer, err := nsq.NewProducer("127.0.0.1:4150", config)
if err != nil {
log.Fatal(err)
}
messageBody := []byte("hello")
topicName := "topic"
// Synchronously publish a single message to the specified topic.
// Messages can also be sent asynchronously and/or in batches.
err = p.Publish(topicName, messageBody)
if err != nil {
log.Fatal(err)
}
// Gracefully stop the producer.
producer.Stop()
Client message compress
Producer is able to compress message before publishing to extend topic, powered by youzan nsq.
message compress codec added in message header, and consumer client which supports message decompressing
decompress message according to that codec.
Basicaly, there is no extra configuration for consumer to support message decompress, but make sure ALL
consumer in topic channels need to support client message decompress BEFORE producer enabe topic compress.
As to producer, with following code in config to enable message compress
config := nsq.NewConfig()
//specifiy message compress codec to lz4, other available codec are: gzip, snappy
config.ClientCompressDecodec = "lz4"
//specify min message byte size for invoke compressing before sending it
config.MessageSizeForCompress = 30 * 1024
//specify topics need message compress
config.TopicsForCompress = []string{"topic1", "topic2"}
*/
package nsq