生成子key: 通配符号 #

订阅:通配符 +

package main

import (
	"encoding/json"
	"fmt"
	"os"
	"time"
	emitter "github.com/emitter-io/go"
)

type MyBody struct {
	Id   uint64
	Body string
}

var localID uint64

func main() {
	localID = 0
	var mykey string
	rMsg := &MyBody{}
	o := emitter.NewClientOptions()
	o.SetOnMessageHandler(func(_ emitter.Emitter, msg emitter.Message) {
		fmt.Printf("Received message: %s
", msg.Payload())
		err := json.Unmarshal(msg.Payload(), rMsg)
		if err != nil {
			fmt.Println("parse json data err!")
		}
		fmt.Printf("Received message: %d
", rMsg.Id)

		if (localID + 1) != rMsg.Id {
			fmt.Println("seq err!")
			os.Exit(1)
		}
		localID = rMsg.Id
		//fmt.Printf("Received message: %d
", rMsg.Id)
	})
	o.SetOnKeyGenHandler(func(_ emitter.Emitter, key emitter.KeyGenResponse) {
		fmt.Printf("get key: %s
", key)
		mykey = key.Key
	})

	o.ClientID = "tttt_123456"
	o.AddBroker("tcp://192.168.162.34:49250")
	o.AddBroker("tcp://192.168.163.200:8081")
	o.AddBroker("tcp://119.3.108.139:8081")
	c := emitter.NewClient(o)
	sToken := c.Connect()
	if sToken.Wait() && sToken.Error() != nil {
		panic("Error on Client.Connect(): " + sToken.Error().Error())
	}

	getKey := emitter.NewKeyGenRequest()
	getKey.Channel = "shuguo/#/"
	getKey.Key = "m1dBYWPaHcabBXb-p4YJkqKWYO_-TSrt" // 200server
	getKey.TTL = 0
	getKey.Type = "rwlsp"
	sToken = c.GenerateKey(getKey)
	if sToken.Wait() && sToken.Error() != nil {
		panic("Error on client generateKey(): " + sToken.Error().Error())
	}
	time.Sleep(2 * time.Second)
	//c.Subscribe(mykey, "shuguo/unicast/user@111/")
	c.Subscribe(mykey, "shuguo/aaa/unicast/+/")
	time.Sleep(1 * time.Second)
	c.Publish(mykey, "shuguo/unicast/user@111", "hello")
	//c.Publish(mykey, "shuguo/aaa/unicast/user/aa/fff/aaf/dip/ggf", "hello")
	//c.Publish(mykey, "shuguo/aaa/unicast/user/aa/fff/aaf/dip", "world")
	//c.Publish(mykey, "shuguo/aaa/unicast/user/aa/fff/aaf", "!")
	//c.Publish(mykey, "shuguo/aaa/unicast/user/aa/fff@8795", "2020")
	select {}
}

  用 emitter 的client库封装了  paho.mqtt.golang ,直接用 paho.mqtt.golang 库 也可以对接 emitter 服务端(携带subKey)

package main

import (
	"fmt"
	"log"
	"os"
	"time"

	"github.com/eclipse/paho.mqtt.golang"
)

var f mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
	fmt.Printf("TOPIC: %s
", msg.Topic())
	fmt.Printf("MSG: %s
", msg.Payload())
}

func main() {
	mqtt.DEBUG = log.New(os.Stdout, "", 0)
	mqtt.ERROR = log.New(os.Stdout, "", 0)
	opts := mqtt.NewClientOptions().AddBroker("tcp://192.168.162.34:49250").SetClientID("vm-11111")
	opts.SetKeepAlive(2 * time.Second)
	opts.SetDefaultPublishHandler(f)
	opts.SetPingTimeout(1 * time.Second)

	c := mqtt.NewClient(opts)
	if token := c.Connect(); token.Wait() && token.Error() != nil {
		panic(token.Error())
	}
	if token := c.Subscribe("1EOr2RdDssJyOyu_GuRvI0j68c0fH9Hz/shuguo/unicast/user@111/", 0, nil); token.Wait() && token.Error() != nil {
		fmt.Println(token.Error())
		os.Exit(1)
	}

	for i := 0; i < 5; i++ {
		text := fmt.Sprintf("this is msg #%d!", i)
		token := c.Publish("1EOr2RdDssJyOyu_GuRvI0j68c0fH9Hz/shuguo/unicast/user@111/", 0, false, text)
		token.Wait()
	}

	time.Sleep(3 * time.Second)

	if token := c.Unsubscribe("1EOr2RdDssJyOyu_GuRvI0j68c0fH9Hz/shuguo/unicast/user@111/"); token.Wait() && token.Error() != nil {
		fmt.Println(token.Error())
		os.Exit(1)
	}

	c.Disconnect(250)

	time.Sleep(1 * time.Second)
}