项目代码地址 #
项目分析 #
- 读取日志文件
- 解析文件
- 存储解析后存储到influxdb库
- 通过grafana实时大屏幕
代码解析 #
入口main.go #
package main
import (
"example/pkg/db"
"example/pkg/logger"
"example/pkg/process"
"os"
"os/signal"
)
func main() {
r := logger.NewLogger("log/access.log")
w := db.NewDB("http://192.168.99.100:8086@admin@12345678@mydb@s")
logProc := process.NewLogProcess(r,w)
go logProc.Reader.Read(logProc.Rc)
go logProc.Process()
go logProc.Writer.Write(logProc.Wc)
sig := make(chan os.Signal)
signal.Notify(sig,os.Interrupt)
<-sig
}
1、创建日志文件对象
2、创建数据库对象
3、创建处理程序对象
except包 #
异常处理包
pkg/except
code.go
package except
const (
SUCCESS = 1
ERROR = -1
)
pkg/except
message.go
var MsgFlags = map[int]string{
SUCCESS: "ok",
ERROR: "fail",
}
func GetMsg(code int) string {
msg, ok := MsgFlags[code]
if ok {
return msg
}
return MsgFlags[ERROR]
}
func CheckError(err error) {
if err != nil {
panic(err)
}
}
logger包 #
读取日志文件包
pkg/logger
manage.go
package logger
func NewLogger(p string) logger {
return logger{path:p}
}
逐条读取日志文件,写入读取字符串数据写入logprocess的rc 信道
pkg/logger
model.go
package logger
import (
"bufio"
"example/pkg/except"
"io"
"os"
"time"
)
type Reader interface {
Read(rc chan string)
}
type logger struct {
path string
}
func (l logger) Read(rc chan string) {
fp, err := os.Open(l.path)
except.CheckError(err)
_, _ = fp.Seek(0, 2)
rd := bufio.NewReader(fp)
for {
line,err := rd.ReadBytes('\n')
if err == io.EOF {
time.Sleep(time.Millisecond * 500)
continue
}
except.CheckError(err)
rc <- string(line[:len(line)-1])
}
}
database包 #
安装influxdb,通过docker安装,注意不能安装lastest,要安装1.x版本
D:\site\gostudy> docker pull influxdb:1.7.8
D:\site\gostudy> docker run -d --name influxdb -p 8086:8086 -p 8083:8083 -v c:/docker/influxdb:/var/lib/influxdb influxdb:1.7.8
操作influxdb的包
pkg/db
manage.go
package db
func NewDB(d string) influxdb {
return influxdb{dns:d}
}
接收logprocess的channel wc,并写入数据库
pkg/db
model.go
package db
import (
"example/pkg/message"
"fmt"
"github.com/influxdata/influxdb1-client/v2"
"log"
"strings"
"time"
)
type Writer interface {
Write(wc chan *message.Message)
}
type influxdb struct {
dns string
}
func (i influxdb) Write(wc chan *message.Message) {
insli := strings.Split(i.dns,"@")
fmt.Println(insli)
c, err := client.NewHTTPClient(client.HTTPConfig{
Addr: insli[0],
Username: insli[1],
Password: insli[2],
})
if err != nil {
log.Fatal(err)
}
defer c.Close()
for v := range wc {
// Create a new point batch
bp, _ := client.NewBatchPoints(client.BatchPointsConfig{
Database: insli[3],
//Precision: "infinite",
Precision: insli[4],
})
if err != nil {
fmt.Println("Error: ", err.Error())
}
fmt.Println(wc)
tags := map[string]string{
"Path": v.Path,
"Method": v.Method,
"Status": v.Status,
"Scheme": v.Scheme,
"Brower": v.Browser,
"Os": v.Os,
}
fields := map[string]interface{}{
"Address": v.Address,
"Size": v.Size,
"LocalTime": v.TimeLocal,
}
fmt.Println(tags,fields)
pt, err := client.NewPoint("nginx_log", tags, fields, time.Now())
if err != nil {
fmt.Println("Error: ", err.Error())
}
bp.AddPoint(pt)
err = c.Write(bp)
if err != nil {
fmt.Println("Error: ", err.Error())
}
}
}
message包 #
pkg/message
mode.go
package message
import "time"
type Message struct {
TimeLocal time.Time
Size int
Address string
Path string
Method string
Scheme string
Status string
Os string
Browser string
}
通过正则表达式解析读取日志文件,分析出日志相应的列
pkg/message
manage.go
package message
import (
"errors"
"regexp"
"strconv"
"time"
)
func ParseMessage(m string) (*Message,error) {
msg := &Message{}
p := `([\d\.]*)\s+[^ ]+\s+[^ ]+\s+\[([^\]]+)\]\s+\"([^\ ]+)\s+([^ ]+)\s+([^\"]+)\"\s+(\d+)\s+(\d+)\s+\"[^\"]+\"\s+\"[^\(]*\s+\(([^\)]+)\)\s+[^ ]+\s+\([^\)]+\)\s+([^\"]+)`
r := regexp.MustCompile(p)
ret := r.FindStringSubmatch(m)
if len(ret) != 10 {
return nil,errors.New("regexp complie err")
}
loc, _ := time.LoadLocation("Local")
t,_ := time.ParseInLocation("02/Jan/2006:15:04:05 +0800",ret[2],loc)
msg.TimeLocal = t
msg.Address = ret[1]
msg.Method = ret[3]
msg.Path = ret[4]
msg.Scheme = ret[5]
msg.Status = ret[6]
msg.Size, _ = strconv.Atoi(ret[7])
msg.Os = ret[8]
msg.Browser = ret[9]
return msg,nil
}
process包 #
pkg/process
manage.go
package process
import (
"example/pkg/db"
"example/pkg/logger"
"example/pkg/message"
)
func NewLogProcess(r logger.Reader,w db.Writer) *LogProcess {
return &LogProcess{
Rc: make(chan string),
Wc: make(chan *message.Message),
Reader: r,
Writer: w,
}
}
pkg/process
model.go
package process
import (
"example/pkg/db"
"example/pkg/logger"
"example/pkg/message"
"fmt"
"strings"
)
type Reader interface {
Read()
}
type LogProcess struct {
Rc chan string //读取到解析传递数据
Wc chan *message.Message //解析到写入模块
Reader logger.Reader
Writer db.Writer
}
func (l *LogProcess)Process() {
for v := range l.Rc {
b := strings.Trim(v, "")
if b== "" {
continue
}
msg,err := message.ParseMessage(v)
if err != nil {
fmt.Println(err)
continue
}
l.Wc <- msg
}
}
运行 #
配置influxdb #
docker run --name influxdb -p 8086:8086 influxdb:1.7
docker exec -it influxdb bash
influx
> create database mydb //创建数据库
> show databases //显示库
> show measurements //显示表
> CREATE USER admin WITH PASSWORD '12345678' WITH ALL PRIVILEGES //授权
> select * from nginx_log //查询数据
go build main.go