golang连接activemq

config.ini 配置文件

[stomp]
;activemq的IP地址
host:192.168.7.85 
;activemq的端口
port:61613 
;activemq的队列
queue:/queue/bbg_ordercache 
[php]
;php的执行路径
phpbin:php.exe
;被执行的文件的路径 
filepath:D:/jianguo/command/application/cli  
;传递给被执行文件的参数
params:show


main.go 代码文件:

package main
import (
	"bytes"
	"fmt"
	"github.com/gmallard/stompngo"
	"github.com/unknwon/goconfig"
	"log"
	"net"
	"os"
	"os/exec"
)
// 存储配置信息的变量
var config *goconfig.ConfigFile
// 存储日志信息的变量
var mylog *log.Logger
// 启动初始化
func init() {
	// 加载配置文件
	configPath := "./config.ini"
	conf, err := goconfig.LoadConfigFile(configPath)
	if err != nil {
		fmt.Println(err)
	}
	config = conf
	// @todo 强化按日期写日志文件
	file := "./log.txt"
	//t := time.Now()
	//file := "./log_" + strings.Replace(t.String()[:19], ":", "_", 3) + ".txt"
	hander, err := os.OpenFile(file, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0666)
	if err != nil {
		log.Println(err)
	}
	mylog = log.New(hander, "\r\n", log.Ldate|log.Ltime|log.Llongfile)
}
// 主程序
func main() {
	host, _ := config.GetValue("stomp", "host")
	port, _ := config.GetValue("stomp", "port")
	n, e := net.Dial("tcp", net.JoinHostPort(host, port))
	if e != nil {
		fmt.Println(e)
	}
	// STOMP 1.0 的标准头
	//h := stompngo.Headers{}
	// STOMP 1.1 的标准头
	h := stompngo.Headers{"accept-version", "1.1"}
	// @todo 强化网络断开之后重试
	c, e := stompngo.Connect(n, h)
	if e != nil {
		fmt.Println(e)
	}
	// 必须客户端响应才可以删除MQ队列数据
	f := stompngo.Headers{"destination", "/queue/bbg_ordercache", "ack", "client"}
	// 自动删除MQ队列的数据
	//f := stompngo.Headers{"destination", "/queue/bbg_ordercache"}
	s, e := c.Subscribe(f)
	if e != nil {
		fmt.Println(e)
	}
	// 设置通道的容量
	//fmt.Println(c.SubChanCap())
	//c.SetSubChanCap(1)
	for {
		//r := <-s
		//fmt.Println(r)
		run(c, s)
	}
}
// 外部shell脚本调用,成功处理删除相应队列
func run(c *stompngo.Connection, s <-chan stompngo.MessageData) {
	phproot, _ := config.GetValue("php", "phpbin")
	filepath, _ := config.GetValue("php", "filepath")
	params, _ := config.GetValue("php", "params")
	r := <-s
	// 记录结果
	mylog.Println(r)
	order_id := r.Message.Headers.Value("order_id")
	//fmt.Println(order_id)
	cmd := exec.Command(phproot, filepath, params, "order_id", order_id)
	var out bytes.Buffer
	cmd.Stdout = &out
	err := cmd.Start()
	if err != nil {
		log.Fatal(err)
	}
	mylog.Println("Waiting for command to finish...")
	err = cmd.Wait()
	if err != nil {
		mylog.Printf("Command finished with error: %v", err)
	}
	str := out.String()
	//fmt.Println(str)
	if str == "success" {
		e := c.Ack(r.Message.Headers)
		if e != nil {
			mylog.Println(e)
		}
	}
}


本文来自:开源中国博客

感谢作者:chen yuwen

查看原文:golang连接activemq

郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。