layout: post
categories: rocketmq golang
title: 读取数据发送到MQ
date: 2019-08-03 14:01:12 +0800
description: 监控文件夹变动产生新文件读取该文件内容每一行都发送到RocketMQ

keywords: RocketMQ Golang

数据通过FTP上传到服务器要求将数据发送到RocketMQ原版本用Java写的,TPS上不去资源消耗大等问题。

缘由

核心代码介绍

package main  
  
import (  
	"fmt"  
	"github.com/astaxie/beego/logs"  
	"github.com/zjykzk/rocketmq-client-go/log"  
	"github.com/zjykzk/rocketmq-client-go/message"  
	"github.com/zjykzk/rocketmq-client-go/producer"  
	"math/rand"  
	"os"  
	"runtime"  
	"strings"  
	"sync/atomic"  
	"time"  
	"unsafe"  
)  
  
type Client struct {  
	P            *producer.Producer  
	Group        string  
	NamesrvAddrs string  
	DataChan     chan Message  
}  
  
func newLogger(filename string) (log.Logger, error) {  
	file, err := os.OpenFile(filename, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)  
	if err != nil {  
		println("create file error", err.Error())  
		return nil, err  
	}  
  
	logger := log.New(file, "", log.Ldefault)  
	logger.Level = log.Ldebug  
  
	return logger, nil  
}  
  
func (cli *Client) SendMsg(stati *statiBenchmarkProducerSnapshot) {  
	now := time.Now()  
	var count = 1;  
	for messageBody := range cli.DataChan {  
		m := &message.Message{Topic: messageBody.topic, Body: []byte(messageBody.line)}  
  
		r, err := cli.P.SendSync(m)  
		if err != nil {  
			logs.Debug("Send with sync error:%s\n", err)  
			continue  
		}  
  
		// 取模  
		//var BatchLines = []message.Data{};  
		//BatchLines = append(BatchLines, message.Data{Body: []byte(messageBody.line)})  
		//if count%defaultSendLimitbatch == 0 {  
		//  
		//	r, err3 := client.SendBatchSync(&message.Batch{Topic: messageBody.topic, Datas: BatchLines})  
		//	if err3 != nil {  
		//		logs.Debug("SendWithBatchSync error:%s\n", err3)  
		//		continue  
		//	}  
		//	logs.Debug(fmt.Sprintf("BatchSendMessageResult: %s FileName = %s  Process = %f%%  ProcessCount = %d , TotalCount = %d", r.OffsetID, messageBody.FileName, float32(messageBody.ProcessCount)/float32(messageBody.TotalCount)*100, messageBody.ProcessCount, messageBody.TotalCount))  
		//}  
  
		//selector  
		//r, err2 := client.SendSyncWithSelector(m, messageQueueSelector{}, count)  
		//if err2 != nil {  
		//	logs.Debug("SendWithSelector error:%s\n", err2)  
		//	continue  
		//}  
  
		if r.Status == producer.OK {  
			atomic.AddInt64(&stati.receiveResponseSuccessCount, 1)  
			atomic.AddInt64(&stati.sendRequestSuccessCount, 1)  
			currentRT := int64(time.Since(now) / time.Millisecond)  
			atomic.AddInt64(&stati.sendMessageSuccessTimeTotal, currentRT)  
			prevRT := atomic.LoadInt64(&stati.sendMessageMaxRT)  
			for currentRT > prevRT {  
				if atomic.CompareAndSwapInt64(&stati.sendMessageMaxRT, prevRT, currentRT) {  
					break  
				}  
				prevRT = atomic.LoadInt64(&stati.sendMessageMaxRT)  
			}  
			if count%1000 == 0 {  
				logs.Debug(fmt.Sprintf("SendMessageResultQueueOffset : %d FileName = %s  Process = %f%%  ProcessCount = %d , TotalCount = %d", r.QueueOffset, messageBody.FileName, float32(messageBody.ProcessCount)/float32(messageBody.TotalCount)*100, messageBody.ProcessCount, messageBody.TotalCount))  
			}  
		}  
		count++;  
	}  
	logs.Debug("DONE")  
}  
  
func NewMQSender(groupId string,nameSvr string) (cli *Client, err error) {  
	cli = &Client{  
		Group:groupId,  
		NamesrvAddrs:nameSvr,  
		DataChan:data,  
	}  
	stati := statiBenchmarkProducerSnapshot{}  
	snapshots := produceSnapshots{cur: &stati}  
  
  
	logger, err := newLogger("producer.log")  
	if err != nil {  
		logs.Debug("new logger of producer.loge error:%s\n", err)  
		return  
	}  
  
	client := producer.New(groupId, strings.Split(nameSvr, ";"), logger)  
  
	err = client.Start()  
	if err != nil {  
		logs.Debug("start producer error: ", err)  
		return  
	}  
	//defer client.Shutdown()  
  
	cli.P = client  
	// 根据CPU来协商启动协程数  
	for i := 0; i < runtime.GOMAXPROCS(runtime.NumCPU()); i++ {  
		go func() {  
			waitgroup.Add(1)  
			cli.SendMsg(&stati)  
			waitgroup.Done()  
		}()  
	}  
	// snapshot  
	go func() {  
		waitgroup.Add(1)  
		defer waitgroup.Done()  
		ticker := time.NewTicker(time.Second)  
		for {  
			select {  
			case <-ticker.C:  
				snapshots.TakeSnapshot()  
			}  
		}  
	}()  
	// print statistic  
	go func() {  
		waitgroup.Add(1)  
		defer waitgroup.Done()  
		ticker := time.NewTicker(time.Second * 10)  
		for {  
			select {  
			case <-ticker.C:  
				snapshots.TrintStati()  
			}  
		}  
	}()  
	return  
}  
  
type messageQueueSelector struct{}  
  
func (s messageQueueSelector) Select(mqs []*message.Queue, m *message.Message, arg interface{}) *message.Queue {  
	orderID := arg.(int)  
	return mqs[orderID%len(mqs)]  
}  
  
const (  
	letterBytes   = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"  
	letterIdxMask = 1<<6 - 1 // All 1-bits, as many as 6  
)  
  
// 随机  
var src = rand.NewSource(time.Now().UnixNano())  
  
func RandStringBytesMaskImprSrc(n int) string {  
	b := make([]byte, n)  
	// A src.Int63() generates 63 random bits, enough for 10 characters!  
	for i, cache, remain := n-1, src.Int63(), 10; i >= 0; {  
		if remain == 0 {  
			cache, remain = src.Int63(), 10  
		}  
		b[i] = letterBytes[int(cache&letterIdxMask)%len(letterBytes)]  
		i--  
		cache >>= 6  
		remain--  
	}  
	return *(*string)(unsafe.Pointer(&b))  
}  
  
  

转载请注明出处,本文采用 CC4.0 协议授权

   留言:

验证成功!
请输入内容!
验证成功!
请输入内容!