Skip to content

Commit

Permalink
Upgrade MQTT to mochi-mqtt v2.4.0
Browse files Browse the repository at this point in the history
  • Loading branch information
werbenhu committed Sep 19, 2023
1 parent 59a0774 commit 0186dcb
Show file tree
Hide file tree
Showing 68 changed files with 2,469 additions and 954 deletions.
33 changes: 17 additions & 16 deletions mqtt/clients.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2022 J. Blake / mochi-co
// SPDX-FileCopyrightText: 2023 mochi-mqtt, mochi-co
// SPDX-FileContributor: mochi-co, wind

package mqtt
Expand Down Expand Up @@ -104,13 +104,13 @@ func (cl *Clients) GetByListener(id string) []*Client {

// Client contains information about a client known by the broker.
type Client struct {
ops *ops // ops provides a reference to server ops.
Properties ClientProperties // client properties
State ClientState // the operational state of the client.
Net ClientConnection // network connection state of the client
ID string // the client id.
Net ClientConnection // network connection state of the clinet
Properties ClientProperties // client properties
InheritWay int // session inheritance way
ops *ops // ops provides a reference to server ops.
sync.RWMutex // mutex
InheritWay int // session inheritance way
}

// ClientConnection contains the connection transport and metadata for the client.
Expand All @@ -119,42 +119,42 @@ type ClientConnection struct {
bconn *bufio.ReadWriter // a buffered net.Conn for reading packets
Remote string // the remote address of the client
Listener string // listener id of the client
Inline bool // client is an inline programmetic client
Inline bool // if true, the client is the built-in 'inline' embedded client
}

// ClientProperties contains the properties which define the client behaviour.
type ClientProperties struct {
Username []byte
Will Will
Props packets.Properties
Will Will
Username []byte
ProtocolVersion byte
Clean bool
}

// Will contains the last will and testament details for a client connection.
type Will struct {
TopicName string // -
Payload []byte // -
User []packets.UserProperty // -
TopicName string // -
Flag uint32 // 0,1
WillDelayInterval uint32 // -
Qos byte // -
Retain bool // -
}

// State tracks the state of the client.
// ClientState tracks the state of the client.
type ClientState struct {
TopicAliases TopicAliases // a map of topic aliases
stopCause atomic.Value // reason for stopping
open context.Context // indicate that the client is open for packet exchange
Subscriptions *Subscriptions // a map of the subscription filters a client maintains
outbound chan *packets.Packet // queue for pending outbound packets
Inflight *Inflight // a map of in-flight qos messages
cancelOpen context.CancelFunc // cancel function for open context
Subscriptions *Subscriptions // a map of the subscription filters a client maintains
disconnected int64 // the time the client disconnected in unix time, for calculating expiry
outbound chan *packets.Packet // queue for pending outbound packets
endOnce sync.Once // only end once
isTakenOver uint32 // used to identify orphaned clients
packetID uint32 // the current highest packetID
open context.Context // indicate that the client is open for packet exchange
cancelOpen context.CancelFunc // cancel function for open context
outboundQty int32 // number of messages currently in the outbound queue
Keepalive uint16 // the number of seconds the connection can wait
ServerKeepalive bool // keepalive was set by the server
Expand Down Expand Up @@ -200,7 +200,8 @@ func (cl *Client) WriteLoop() {
select {
case pk := <-cl.State.outbound:
if err := cl.WritePacket(*pk); err != nil {
cl.ops.log.Debug().Err(err).Str("client", cl.ID).Interface("packet", pk).Msg("failed publishing packet")
// TODO : Figure out what to do with error
cl.ops.log.Debug("failed publishing packet", "error", err, "client", cl.ID, "packet", pk)
}
atomic.AddInt32(&cl.State.outboundQty, -1)
case <-cl.State.open.Done():
Expand Down Expand Up @@ -318,7 +319,7 @@ func (cl *Client) ResendInflightMessages(force bool) error {
return nil
}

// ClearInflights deletes all inflight messages for the client, eg. for a disconnected user with a clean session.
// ClearInflights deletes all inflight messages for the client, e.g. for a disconnected user with a clean session.
func (cl *Client) ClearInflights(now, maximumExpiry int64) []uint16 {
deleted := []uint16{}
for _, tk := range cl.State.Inflight.GetAll(false) {
Expand Down
60 changes: 30 additions & 30 deletions mqtt/clients_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2022 mochi-co
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
// SPDX-FileContributor: mochi-co

package mqtt
Expand Down Expand Up @@ -29,7 +29,7 @@ func newTestClient() (cl *Client, r net.Conn, w net.Conn) {
cl = newClient(w, &ops{
info: new(system.Info),
hooks: new(Hooks),
log: &logger,
log: logger,
options: &Options{
Capabilities: &Capabilities{
ReceiveMaximum: 10,
Expand Down Expand Up @@ -263,7 +263,7 @@ func TestClientNextPacketIDOverflow(t *testing.T) {
cl.State.Inflight.internal[uint16(i)] = packets.Packet{}
}

cl.State.packetID = uint32(cl.ops.options.Capabilities.maximumPacketID - 1)
cl.State.packetID = cl.ops.options.Capabilities.maximumPacketID - 1
i, err := cl.NextPacketID()
require.NoError(t, err)
require.Equal(t, cl.ops.options.Capabilities.maximumPacketID, i)
Expand Down Expand Up @@ -303,7 +303,7 @@ func TestClientResendInflightMessages(t *testing.T) {
err := cl.ResendInflightMessages(true)
require.NoError(t, err)
time.Sleep(time.Millisecond)
w.Close()
_ = w.Close()
}()

buf, err := io.ReadAll(r)
Expand All @@ -315,7 +315,7 @@ func TestClientResendInflightMessages(t *testing.T) {
func TestClientResendInflightMessagesWriteFailure(t *testing.T) {
pk1 := packets.TPacketData[packets.Publish].Get(packets.TPublishQos1Dup)
cl, r, _ := newTestClient()
r.Close()
_ = r.Close()

cl.State.Inflight.Set(*pk1.Packet)
require.Equal(t, 1, cl.State.Inflight.Len())
Expand All @@ -342,8 +342,8 @@ func TestClientReadFixedHeader(t *testing.T) {

defer cl.Stop(errClientStop)
go func() {
r.Write([]byte{packets.Connect << 4, 0x00})
r.Close()
_, _ = r.Write([]byte{packets.Connect << 4, 0x00})
_ = r.Close()
}()

fh := new(packets.FixedHeader)
Expand All @@ -357,8 +357,8 @@ func TestClientReadFixedHeaderDecodeError(t *testing.T) {
defer cl.Stop(errClientStop)

go func() {
r.Write([]byte{packets.Connect<<4 | 1<<1, 0x00, 0x00})
r.Close()
_, _ = r.Write([]byte{packets.Connect<<4 | 1<<1, 0x00, 0x00})
_ = r.Close()
}()

fh := new(packets.FixedHeader)
Expand All @@ -372,8 +372,8 @@ func TestClientReadFixedHeaderPacketOversized(t *testing.T) {
defer cl.Stop(errClientStop)

go func() {
r.Write(packets.TPacketData[packets.Publish].Get(packets.TPublishQos1Dup).RawBytes)
r.Close()
_, _ = r.Write(packets.TPacketData[packets.Publish].Get(packets.TPublishQos1Dup).RawBytes)
_ = r.Close()
}()

fh := new(packets.FixedHeader)
Expand All @@ -387,7 +387,7 @@ func TestClientReadFixedHeaderReadEOF(t *testing.T) {
defer cl.Stop(errClientStop)

go func() {
r.Close()
_ = r.Close()
}()

fh := new(packets.FixedHeader)
Expand All @@ -401,8 +401,8 @@ func TestClientReadFixedHeaderNoLengthTerminator(t *testing.T) {
defer cl.Stop(errClientStop)

go func() {
r.Write([]byte{packets.Connect << 4, 0xd5, 0x86, 0xf9, 0x9e, 0x01})
r.Close()
_, _ = r.Write([]byte{packets.Connect << 4, 0xd5, 0x86, 0xf9, 0x9e, 0x01})
_ = r.Close()
}()

fh := new(packets.FixedHeader)
Expand All @@ -414,7 +414,7 @@ func TestClientReadOK(t *testing.T) {
cl, r, _ := newTestClient()
defer cl.Stop(errClientStop)
go func() {
r.Write([]byte{
_, _ = r.Write([]byte{
packets.Publish << 4, 18, // Fixed header
0, 5, // Topic Name - LSB+MSB
'a', '/', 'b', '/', 'c', // Topic Name
Expand All @@ -424,7 +424,7 @@ func TestClientReadOK(t *testing.T) {
'd', '/', 'e', '/', 'f', // Topic Name
'y', 'e', 'a', 'h', // Payload
})
r.Close()
_ = r.Close()
}()

var pks []packets.Packet
Expand Down Expand Up @@ -499,10 +499,10 @@ func TestClientReadFixedHeaderError(t *testing.T) {
cl, r, _ := newTestClient()
defer cl.Stop(errClientStop)
go func() {
r.Write([]byte{
_, _ = r.Write([]byte{
packets.Publish << 4, 11, // Fixed header
})
r.Close()
_ = r.Close()
}()

cl.Net.bconn = nil
Expand All @@ -516,13 +516,13 @@ func TestClientReadReadHandlerErr(t *testing.T) {
cl, r, _ := newTestClient()
defer cl.Stop(errClientStop)
go func() {
r.Write([]byte{
_, _ = r.Write([]byte{
packets.Publish << 4, 11, // Fixed header
0, 5, // Topic Name - LSB+MSB
'd', '/', 'e', '/', 'f', // Topic Name
'y', 'e', 'a', 'h', // Payload
})
r.Close()
_ = r.Close()
}()

err := cl.Read(func(cl *Client, pk packets.Packet) error {
Expand All @@ -536,13 +536,13 @@ func TestClientReadReadPacketOK(t *testing.T) {
cl, r, _ := newTestClient()
defer cl.Stop(errClientStop)
go func() {
r.Write([]byte{
_, _ = r.Write([]byte{
packets.Publish << 4, 11, // Fixed header
0, 5,
'd', '/', 'e', '/', 'f',
'y', 'e', 'a', 'h',
})
r.Close()
_ = r.Close()
}()

fh := new(packets.FixedHeader)
Expand Down Expand Up @@ -573,7 +573,7 @@ func TestClientReadPacket(t *testing.T) {
t.Run(tt.Desc, func(t *testing.T) {
atomic.StoreInt64(&cl.ops.info.PacketsReceived, 0)
go func() {
r.Write(tt.RawBytes)
_, _ = r.Write(tt.RawBytes)
}()

fh := new(packets.FixedHeader)
Expand All @@ -600,7 +600,7 @@ func TestClientReadPacket(t *testing.T) {

func TestClientReadPacketInvalidTypeError(t *testing.T) {
cl, _, _ := newTestClient()
cl.Net.Conn.Close()
_ = cl.Net.Conn.Close()
_, err := cl.ReadPacket(&packets.FixedHeader{})
require.Error(t, err)
require.Contains(t, err.Error(), "invalid packet type")
Expand All @@ -624,7 +624,7 @@ func TestClientWritePacket(t *testing.T) {
require.NoError(t, err, pkInfo, tt.Case, tt.Desc)

time.Sleep(2 * time.Millisecond)
cl.Net.Conn.Close()
_ = cl.Net.Conn.Close()

require.Equal(t, tt.RawBytes, <-o, pkInfo, tt.Case, tt.Desc)

Expand Down Expand Up @@ -660,13 +660,13 @@ func TestClientReadPacketReadingError(t *testing.T) {
cl, r, _ := newTestClient()
defer cl.Stop(errClientStop)
go func() {
r.Write([]byte{
_, _ = r.Write([]byte{
0, 11, // Fixed header
0, 5,
'd', '/', 'e', '/', 'f',
'y', 'e', 'a', 'h',
})
r.Close()
_ = r.Close()
}()

_, err := cl.ReadPacket(&packets.FixedHeader{
Expand All @@ -680,13 +680,13 @@ func TestClientReadPacketReadUnknown(t *testing.T) {
cl, r, _ := newTestClient()
defer cl.Stop(errClientStop)
go func() {
r.Write([]byte{
_, _ = r.Write([]byte{
0, 11, // Fixed header
0, 5,
'd', '/', 'e', '/', 'f',
'y', 'e', 'a', 'h',
})
r.Close()
_ = r.Close()
}()

_, err := cl.ReadPacket(&packets.FixedHeader{
Expand All @@ -706,7 +706,7 @@ func TestClientWritePacketWriteNoConn(t *testing.T) {

func TestClientWritePacketWriteError(t *testing.T) {
cl, _, _ := newTestClient()
cl.Net.Conn.Close()
_ = cl.Net.Conn.Close()

err := cl.WritePacket(*pkTable[1].Packet)
require.Error(t, err)
Expand Down
9 changes: 5 additions & 4 deletions mqtt/cmd/main.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2022 mochi-co
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
// SPDX-FileContributor: mochi-co

package main
Expand Down Expand Up @@ -68,7 +68,8 @@ func main() {
}()

<-done
server.Log.Warn().Msg("caught signal, stopping...")
server.Close()
server.Log.Info().Msg("main.go finished")
server.Log.Warn("caught signal, stopping...")
_ = server.Close()
server.Log.Info("main.go finished")

}
8 changes: 4 additions & 4 deletions mqtt/examples/auth/basic/main.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2022 mochi-co
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
// SPDX-FileContributor: mochi-co

package main
Expand Down Expand Up @@ -77,7 +77,7 @@ func main() {
}()

<-done
server.Log.Warn().Msg("caught signal, stopping...")
server.Close()
server.Log.Info().Msg("main.go finished")
server.Log.Warn("caught signal, stopping...")
_ = server.Close()
server.Log.Info("main.go finished")
}
8 changes: 4 additions & 4 deletions mqtt/examples/auth/encoded/main.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2022 mochi-co
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
// SPDX-FileContributor: mochi-co

package main
Expand Down Expand Up @@ -59,7 +59,7 @@ func main() {
}()

<-done
server.Log.Warn().Msg("caught signal, stopping...")
server.Close()
server.Log.Info().Msg("main.go finished")
server.Log.Warn("caught signal, stopping...")
_ = server.Close()
server.Log.Info("main.go finished")
}
8 changes: 4 additions & 4 deletions mqtt/examples/benchmark/main.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2022 mochi-co
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
// SPDX-FileContributor: mochi-co

package main
Expand Down Expand Up @@ -45,7 +45,7 @@ func main() {
}()

<-done
server.Log.Warn().Msg("caught signal, stopping...")
server.Close()
server.Log.Info().Msg("main.go finished")
server.Log.Warn("caught signal, stopping...")
_ = server.Close()
server.Log.Info("main.go finished")
}
Loading

0 comments on commit 0186dcb

Please sign in to comment.