github.com/eclipse/paho.mqtt.golang
/* * Copyright (c) 2021 IBM Corp and others. * * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v2.0 * and Eclipse Distribution License v1.0 which accompany this distribution. * * The Eclipse Public License is available at * https://www.eclipse.org/legal/epl-2.0/ * and the Eclipse Distribution License is available at * http://www.eclipse.org/org/documents/edl-v10.php. * * Contributors: * Allan Stockdill-Mander */ package mqtt import ( "errors" "testing" "time" ) func TestWaitTimeout(t *testing.T) { b := baseToken{} if b.WaitTimeout(time.Second) { t.Fatal("Should have failed") } // Now lets confirm that WaitTimeout returns // setError() grabs the mutex which previously caused issues // when there is a result (it returns true in this case) b = baseToken{complete: make(chan struct{})} go func(bt *baseToken) { bt.setError(errors.New("test error")) }(&b) if !b.WaitTimeout(5 * time.Second) { t.Fatal("Should have succeeded") } }
/* * Copyright (c) 2021 IBM Corp and others. * * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v2.0 * and Eclipse Distribution License v1.0 which accompany this distribution. * * The Eclipse Public License is available at * https://www.eclipse.org/legal/epl-2.0/ * and the Eclipse Distribution License is available at * http://www.eclipse.org/org/documents/edl-v10.php. * * Contributors: * Allan Stockdill-Mander */ package mqtt import ( "sync" "time" "github.com/eclipse/paho.mqtt.golang/packets" ) // PacketAndToken is a struct that contains both a ControlPacket and a // Token. This struct is passed via channels between the client interface // code and the underlying code responsible for sending and receiving // MQTT messages. type PacketAndToken struct { p packets.ControlPacket t tokenCompletor } // Token defines the interface for the tokens used to indicate when // actions have completed. type Token interface { // Wait will wait indefinitely for the Token to complete, ie the Publish // to be sent and confirmed receipt from the broker. Wait() bool // WaitTimeout takes a time.Duration to wait for the flow associated with the // Token to complete, returns true if it returned before the timeout or // returns false if the timeout occurred. In the case of a timeout the Token // does not have an error set in case the caller wishes to wait again. WaitTimeout(time.Duration) bool // Done returns a channel that is closed when the flow associated // with the Token completes. Clients should call Error after the // channel is closed to check if the flow completed successfully. // // Done is provided for use in select statements. Simple use cases may // use Wait or WaitTimeout. Done() <-chan struct{} Error() error } type TokenErrorSetter interface { setError(error) } type tokenCompletor interface { Token TokenErrorSetter flowComplete() } type baseToken struct { m sync.RWMutex complete chan struct{} err error } // Wait implements the Token Wait method. func (b *baseToken) Wait() bool { <-b.complete return true } // WaitTimeout implements the Token WaitTimeout method. func (b *baseToken) WaitTimeout(d time.Duration) bool { timer := time.NewTimer(d) select { case <-b.complete: if !timer.Stop() { <-timer.C } return true case <-timer.C: } return false } // Done implements the Token Done method. func (b *baseToken) Done() <-chan struct{} { return b.complete } func (b *baseToken) flowComplete() { select { case <-b.complete: default: close(b.complete) } } func (b *baseToken) Error() error { b.m.RLock() defer b.m.RUnlock() return b.err } func (b *baseToken) setError(e error) { b.m.Lock() b.err = e b.flowComplete() b.m.Unlock() } func newToken(tType byte) tokenCompletor { switch tType { case packets.Connect: return &ConnectToken{baseToken: baseToken{complete: make(chan struct{})}} case packets.Subscribe: return &SubscribeToken{baseToken: baseToken{complete: make(chan struct{})}, subResult: make(map[string]byte)} case packets.Publish: return &PublishToken{baseToken: baseToken{complete: make(chan struct{})}} case packets.Unsubscribe: return &UnsubscribeToken{baseToken: baseToken{complete: make(chan struct{})}} case packets.Disconnect: return &DisconnectToken{baseToken: baseToken{complete: make(chan struct{})}} } return nil } // ConnectToken is an extension of Token containing the extra fields // required to provide information about calls to Connect() type ConnectToken struct { baseToken returnCode byte sessionPresent bool } // ReturnCode returns the acknowledgement code in the connack sent // in response to a Connect() func (c *ConnectToken) ReturnCode() byte { c.m.RLock() defer c.m.RUnlock() return c.returnCode } // SessionPresent returns a bool representing the value of the // session present field in the connack sent in response to a Connect() func (c *ConnectToken) SessionPresent() bool { c.m.RLock() defer c.m.RUnlock() return c.sessionPresent } // PublishToken is an extension of Token containing the extra fields // required to provide information about calls to Publish() type PublishToken struct { baseToken messageID uint16 } // MessageID returns the MQTT message ID that was assigned to the // Publish packet when it was sent to the broker func (p *PublishToken) MessageID() uint16 { return p.messageID } // SubscribeToken is an extension of Token containing the extra fields // required to provide information about calls to Subscribe() type SubscribeToken struct { baseToken subs []string subResult map[string]byte messageID uint16 } // Result returns a map of topics that were subscribed to along with // the matching return code from the broker. This is either the Qos // value of the subscription or an error code. func (s *SubscribeToken) Result() map[string]byte { s.m.RLock() defer s.m.RUnlock() return s.subResult } // UnsubscribeToken is an extension of Token containing the extra fields // required to provide information about calls to Unsubscribe() type UnsubscribeToken struct { baseToken messageID uint16 } // DisconnectToken is an extension of Token containing the extra fields // required to provide information about calls to Disconnect() type DisconnectToken struct { baseToken }
去除业务,这个token包主要是为了表示动作完成的操作
可以综合mqtt文档看一下mqtt包控制报文类型
// mqtt 控制包类型 // Below are the constants assigned to each of the MQTT packet type // 下面来自mqtt 中文文档 //Table 2.1 - Control packet types // //|Name |Value |Direction of flow |Description //|Reserved |0 |Forbidden |Reserved //|CONNECT |1 |Client to Server | //|CONNACK |2 |Server to Client |Connect acknowledgment //|PUBLISH |3 |Client to Server or Server to Client |Publish message //|PUBACK |4 |Client to Server or Server to Client |Publish acknowledgment //|PUBREC |5 |Client to Server or Server to Client |Publish received (assured delivery part 1) //|PUBREL |6 |Client to Server or Server to Client |Publish release (assured delivery part 2) //|PUBCOMP |7 |Client to Server or Server to Client |Publish complete (assured delivery part 3) //|SUBSCRIBE |8 |Client to Server |Client subscribe request //|SUBACK |9 |Server to Client |Subscribe acknowledgment //|UNSUBSCRIBE |10 |Client to Server |Unsubscribe request //|UNSUBACK |11 |Server to Client |Unsubscribe acknowledgment //|PINGREQ |12 |Client to Server |PING request //|PINGRESP |13 |Server to Client |PING response //|DISCONNECT |14 |Client to Server |Client is disconnecting //|Reserved |15 |Forbidden |Reserved const ( Connect = 1 Connack = 2 Publish = 3 Puback = 4 Pubrec = 5 Pubrel = 6 Pubcomp = 7 Subscribe = 8 Suback = 9 Unsubscribe = 10 Unsuback = 11 Pingreq = 12 Pingresp = 13 Disconnect = 14 )