본문 바로가기

Public Cloud/Cloudflare

[cloudflare] spectrum log의 Clientasn 필드로 AS Name 필드 추가하여 logstash 로 수집하기

반응형

나는 cloudflare spectrum logpush를 사용하여 Cloud Storage에 log raw data를 저장하고 있으며 python으로 짠 스크립트를 2분만다 스케줄링하여 내려받고 가공하여 logstash로 수집하고 elasticsearch로 처리하고 있다.

 

 

 

 

Spectrum Log 필드 중 "ClientAsn" 값을 이용하여 AS Name 필드를 Logstash에서 추가 하려고 한다.

 

Logpush 설정 참고 : [Cloudflare] Spectrum Logpush 설정하기(azure)
Logstash translate을 이용한 필드 추가 참고 : [Cloudflare] ColoCode를 Logstash에서 좌표값 Field 추가 하기

 

 

 

방식은 Python 스크립트에서 내려받은 Log Data를 이용하여 AS Name 을 정의 하고 Logstash에서 translate dictionary로 참조하여 필드 값을 추가 하고자 한다.

 

 

ASN 조회에 사용될 툴은  whois로 아래와 같이 조회 할 수 있다.

 

whois -h whois.cymru.com " -v <ASN or IP>"

참조 : https://www.team-cymru.com/ip-asn-mapping
whois -h whois.cymru.com " -v 8.8.8.8"
AS      | IP               | BGP Prefix          | CC | Registry | Allocated  | AS Name
15169   | 8.8.8.8          | 8.8.8.0/24          | US | arin     | 2023-12-28 | GOOGLE, US

whois -h whois.cymru.com " -v AS15169"
AS      | CC | Registry | Allocated  | AS Name
15169   | US | arin     | 2000-03-30 | GOOGLE, US

 

 

 

AS Name을 정의할 Python 코드는 아래와 같다.(참고만 하면 좋을 듯하다.)

while True:
    # fp_r은 다운로드 받은 log
    line = fp_r.readline()
    if not line:
        break
    line_j = json.loads(line)
    # as_info 는 정의 된 AS Name 
    as_vs = as_info.get(line_j['ClientAsn'], "NONE")
    if as_vs == "NONE":
        # ASN 조회
        cmd = 'whois -h whois.cymru.com " -v AS{a}"'.format(a=line_j['ClientAsn'])
        result = subprocess.check_output(cmd, shell=True, universal_newlines=True)
        a = result.split('\n')
        a_key = a[0].split('|')
        a_value = a[1].split('|')
        info = {}
        # AS Name 정의
        for i in range(0,len(a_key)):
            info[a_key[i].strip()] = a_value[i].strip()
        as_info[line_j['ClientAsn']] = info
        plo = 1
    else:
        continue

 

 

 

정의 된 AS Name 은 아래의 형태와 같다.

 }
  "41717": {
    "AS": "41717",
    "CC": "HK",
    "Registry": "ripencc",
    "Allocated": "2017-07-11",
    "AS Name": "TELFLYNETWORK, HK"
  },
  "53848": {
    "AS": "53848",
    "CC": "US",
    "Registry": "arin",
    "Allocated": "2018-02-06",
    "AS Name": "MRTC-WLBTKY, US"
  },
  "45700": {
    "AS": "45700",
    "CC": "ID",
    "Registry": "apnic",
    "Allocated": "2009-04-06",
    "AS Name": "NAPINFO-AS-ID PT. NAP Info Lintas Nusa, ID"
  },
  "262354": {
    "AS": "262354",
    "CC": "BR",
    "Registry": "lacnic",
    "Allocated": "2012-01-11",
    "AS Name": "GIGA MAIS FIBRA TELECOMUNICACOES S.A. LIGUE, BR"
  }
}

 

 

 

Spectrum Log가 계속 수집 될 동안 AS Name 정의는 계속 진행된다.

 

다음으로 Logstash translate 설정은 아래와 같다.

filter {
	...
	translate {
    	regex => true
        dictionary_path => "/path/to/as_info.json"
        field => "ClientAsn"
        destination => "asn"
    }
    ...
}

 

 

 

elasticsearch 에서 아래와 같이 수집된다.

반응형