반응형
나는 cloudflare spectrum logpush를 사용하여 Cloud Storage에 log raw data를 저장하고 있으며 python으로 짠 스크립트를 2분만다 스케줄링하여 내려받고 가공하여 logstash로 수집하고 elasticsearch로 처리하고 있다.
Spectrum Log 필드 중 "ClientAsn" 값을 이용하여 AS Name 필드를 Logstash에서 추가 하려고 한다.
Logpush 설정 참고 : [Cloudflare] Spectrum Logpush 설정하기(azure)
Logstash translate을 이용한 필드 추가 참고 : [Cloudflare] ColoCode를 Logstash에서 좌표값 Field 추가 하기
방식은 Python 스크립트에서 내려받은 Log Data를 이용하여 AS Name 을 정의 하고 Logstash에서 translate dictionary로 참조하여 필드 값을 추가 하고자 한다.
ASN 조회에 사용될 툴은 whois로 아래와 같이 조회 할 수 있다.
whois -h whois.cymru.com " -v <ASN or IP>"
참조 : https://www.team-cymru.com/ip-asn-mapping
whois -h whois.cymru.com " -v 8.8.8.8"
AS | IP | BGP Prefix | CC | Registry | Allocated | AS Name
15169 | 8.8.8.8 | 8.8.8.0/24 | US | arin | 2023-12-28 | GOOGLE, US
whois -h whois.cymru.com " -v AS15169"
AS | CC | Registry | Allocated | AS Name
15169 | US | arin | 2000-03-30 | GOOGLE, US
AS Name을 정의할 Python 코드는 아래와 같다.(참고만 하면 좋을 듯하다.)
while True:
# fp_r은 다운로드 받은 log
line = fp_r.readline()
if not line:
break
line_j = json.loads(line)
# as_info 는 정의 된 AS Name
as_vs = as_info.get(line_j['ClientAsn'], "NONE")
if as_vs == "NONE":
# ASN 조회
cmd = 'whois -h whois.cymru.com " -v AS{a}"'.format(a=line_j['ClientAsn'])
result = subprocess.check_output(cmd, shell=True, universal_newlines=True)
a = result.split('\n')
a_key = a[0].split('|')
a_value = a[1].split('|')
info = {}
# AS Name 정의
for i in range(0,len(a_key)):
info[a_key[i].strip()] = a_value[i].strip()
as_info[line_j['ClientAsn']] = info
plo = 1
else:
continue
정의 된 AS Name 은 아래의 형태와 같다.
}
"41717": {
"AS": "41717",
"CC": "HK",
"Registry": "ripencc",
"Allocated": "2017-07-11",
"AS Name": "TELFLYNETWORK, HK"
},
"53848": {
"AS": "53848",
"CC": "US",
"Registry": "arin",
"Allocated": "2018-02-06",
"AS Name": "MRTC-WLBTKY, US"
},
"45700": {
"AS": "45700",
"CC": "ID",
"Registry": "apnic",
"Allocated": "2009-04-06",
"AS Name": "NAPINFO-AS-ID PT. NAP Info Lintas Nusa, ID"
},
"262354": {
"AS": "262354",
"CC": "BR",
"Registry": "lacnic",
"Allocated": "2012-01-11",
"AS Name": "GIGA MAIS FIBRA TELECOMUNICACOES S.A. LIGUE, BR"
}
}
Spectrum Log가 계속 수집 될 동안 AS Name 정의는 계속 진행된다.
다음으로 Logstash translate 설정은 아래와 같다.
filter {
...
translate {
regex => true
dictionary_path => "/path/to/as_info.json"
field => "ClientAsn"
destination => "asn"
}
...
}
elasticsearch 에서 아래와 같이 수집된다.
반응형
'Public Cloud > Cloudflare' 카테고리의 다른 글
[Cloudflare] R2를 rclone으로 다뤄 보기[2] (0) | 2024.06.04 |
---|---|
[Cloudflare] R2를 rclone으로 다뤄 보기[1] (0) | 2024.06.04 |
[Cloudflare-CDN] HTTP Header를 무시하고 모든 콘텐츠 caching 하기 (0) | 2023.06.29 |
[Cloudflare] CDN Cache 응답(Response Status)에 따른 HTTP 통신 파악하기 (0) | 2023.06.29 |
[Cloudflare] ColoCode를 Logstash에서 좌표값 Field 추가 하기 (0) | 2022.12.26 |