Golang操作ElasticSearch

https改造剩下最后一些不好啃的域名(全站HTTPS改造注意事项及解决方案),包括支付、微服务网关等,服务太重要,需要排查http请求的调用方,通过公司es来查nginx代理的日志,但是最后发现数据量略大,导不出来,只能调接口了。golang中操作elastic主要有以下两个库:

olivere/elastic

go-elasticsearch

> Elasticsearch Guide | ES Python API Documentation

olivre/elastic这个库封装的比官方的好,先看这个:

package main

import (
   "context"
   "fmt"
   "github.com/olivere/elastic/v7"
   "log"
   "os"
   "reflect"
   "time"
)

// 查询结果定义
type QueryInfo struct {
   Domain string `json:"domain"`     // 查询域名
   ClientIP string `json:"client_ip"` // 来源IP
   Scheme string `json:"scheme"`     // 协议类型
   Referer string `json:"referer"`       // 请求来源
   Url string `json:"url"`             // 请求url
   HttpCode string `json:"http_code"` // 返回状态码
   Method string `json:"method"`     // 请求方法
}

// 初始化变量
var (
   queryInfo QueryInfo
   indexName = "chegva_ngx_proxy_log"
   // es集群地址列表:https://sgp.api.es.che/product/es/cluster-list
   apiAddresses = []string{"http://sgp.api.es.che"} // elasticsearch 服务地址,多个服务地址使用逗号分隔
   username, password = "anzhihe", "anzhihe"
   client *elastic.Client
   res *elastic.SearchResult
   err error
   ctx context.Context
)

func init() {
   // 连接es集群
   client, err = elastic.NewClient(
      elastic.SetURL(apiAddresses...),
      elastic.SetBasicAuth(username, password),
      // 允许您指定弹性是否应该定期检查集群(默认为真)
      elastic.SetSniff(false),
      // 设置监控检查时间间隔
      elastic.SetHealthcheckInterval(10*time.Second),
      // 设置错误日志输出
      elastic.SetErrorLog(log.New(os.Stderr, "ELASTIC ", log.LstdFlags)),
      // 设置info日志输出
      elastic.SetInfoLog(log.New(os.Stdout, "", log.LstdFlags)))

   if err != nil {
      fmt.Println("连接失败:%v\n", err)
      panic(err)
   }

   fmt.Println("连接成功", client)
}

// 查询日志
func Search() {
   // 执行ES请求需要提供一个上下文对象
   ctx = context.Background()

   // 创建bool组合查询;must条件类似SQL的and;must_not与must作用相反;should类似SQL中的or,只要匹配区中一个就行
   //boolQuery := elastic.NewBoolQuery().Must()

   // 创建查询
   // term精确查询;terms多值查询类似SQL的in查询
   domainQuery := elastic.NewTermQuery("domain", "api.pay.chegva.com")
   // Match查询,匹配单个字段
   urlQuery := elastic.NewMatchQuery("url", "/pay/")
   // Ranges范围查询
   timeQuery := elastic.NewRangeQuery("timestamp").
      Gte("2022-02-22").
      Lte("now").
      Format("yyyy-MM-dd")

   // Filter 多重条件筛选
   boolSearch := elastic.NewBoolQuery().
      Filter(domainQuery).
      Filter(urlQuery).
      Filter(timeQuery)

   // 设置bool查询的must条件, 组合了两个子查询
   //boolQuery.Must(domainQuery, timeQuery)

   res, err = client.Search().
      Index(indexName).
      Query(boolSearch).
      From(0).Size(10). // 拿前10个结果,默认不能大于10000
      Pretty(true).
      Do(ctx) // 执行
      
   if err != nil {
      panic(err)
   }
   
   total := res.TotalHits()
   fmt.Printf("Found %d results\n", total)
   if total > 0 {
      printQueryInfo(res, err)
   } else {
      fmt.Println("Not found!")
   }
}

// 打印查询到的输出
func printQueryInfo(res *elastic.SearchResult, err error) {
   if err != nil {
      print(err.Error())
      return
   }
   // 通过Each方法,将es结果的json结构转换成struct对象
   for _, item := range res.Each(reflect.TypeOf(queryInfo)) { //从搜索结果中取数据的方法
      if q, ok := item.(QueryInfo); ok {
         fmt.Printf("%#v\n", q)
      }
   }
}

func main() {
   Search()
}

输出结果:

连接成功 http://sgp.api.es.che [dead=false,failures=0,deadSince=<nil>]
2022/02/22 11:20:30 POST http://sgp.api.es.che/chegva_ngx_proxy_log/_search?pretty=true [status:200, request:0.283s]
Found 9795 results
main.QueryInfo{Domain:"api.pay.chegva.com", ClientIP:"110.96.00.11", Scheme:"http", Referer:"https://hk.chegva.com/", Url:"/pay/000/79", HttpCode:"302", Method:"POST"}
main.QueryInfo{Domain:"api.pay.chegva.com", ClientIP:"112.74.233.44", Scheme:"http", Referer:"https://chegva.com/tw/buy/confirm/110", Url:"/pay/111/59", HttpCode:"200", Method:"POST"}

官方库go-elasticsearch使用:

package main

import (
   "bytes"
   "encoding/json"
   "fmt"
   "github.com/elastic/go-elasticsearch/v7"
   "log"
)

func main() {
   cfg := elasticsearch.Config{
      Addresses: []string{
         "http://sgp.api.es.che",
      },
      Username: "anzhihe",
      Password: "anzhihe",
   }
   var buf bytes.Buffer

   query := map[string]interface{}{
      "query": map[string]interface{}{
         "match": map[string]interface{}{
            "domain": "api.pay.chegva.com",
         },
      },
   }
   if err := json.NewEncoder(&buf).Encode(query); err != nil {
      log.Fatalf("Error encoding query: %s", err)
   }
   es, _ := elasticsearch.NewClient(cfg)
   log.Println(es.Info())
   result, err := es.Search(es.Search.WithIndex("chegva_ngx_proxy_log"), es.Search.WithBody(&buf))
   if err != nil {
      fmt.Println(err)
   }
   fmt.Println(result)
}

但是发现返回的scheme字段值都是http,原因是业务nginx代理前面还接了一层slb,http和https流量先打到slb上,slb转到后端nginx代理都是走的http,所以查出来的数据筛不出http流量请求来源ip,脚本白写了,坑~!


参考:

anzhihe 安志合个人博客,版权所有 丨 如未注明,均为原创 丨 转载请注明转自:https://chegva.com/5151.html | ☆★★每天进步一点点,加油!★★☆ | 

您可能还感兴趣的文章!

发表评论

电子邮件地址不会被公开。 必填项已用*标注