Blog

2017.05.08 - 번역 - Indexing your CSV files with Elasticsearch Ingest Node ...

drscg 2019. 1. 7. 11:43

The idea of this article is to go over the capabilities of some of the features of Ingest node,  which will be combined to parse a Comma Separated Value (CSV) file. We will go over what is an Ingest Node, what type of operations one can perform, and show a specific example starting from scratch to parse and display CSV data using the Elasticsearch and Kibana.

이 글의 목적은 CSV(Comma Separated Value) file를 parse하여 결합되는 Ingest node 의 기능 일부에 대한 내용을 검토하는 것이다. Ingest Node 가 무엇인지, 어떤 유형의 연산을 수행할 수 있는지, 그리고 Elasticsearch와 Kibana를 사용하여, CSV data를 parse하고 표시하는 구체적인 예를 처음부터 보여주겠다.

For that we will use an open catalog of community issues from New York, NY. This CSV file was updated in October 2015 and It consists of 32 fields which draws the complete railway station entrances and exits lists. The goal will be to use the Ingest feature of Elasticsearch in a cluster on Elastic Cloud to parse the data into a structured json, index the data, and use Kibana to build a Map of New York City that includes all this railway stations.

이를 위해, 뉴욕의 지역 사회 이슈에 대한 open catalog 를 사용한다. 이 CSV 파일은 2015.10 에 update되었고, 전체 철도역 입구와 출구 목록을 나타내는 32개의 field로 구성되어 있다. 목표는 Els 의 cluster에서 Elasticsearch의 lngest 기능을 사용하여, data를 json 구조로 parse하고, data를 index하고, Kibana를 사용하여 이 모든 철도역을 포한하는 뉴욕시의 지도를 구성하는 것이다.

We will use the Ingest feature from Elasticsearch instead of Logstash as a way to remove the need of extra software/architecture setup for a simple problem that can be solved just with Elasticsearch.

추가 소프트웨어/아키텍처 설정 없이 Elasticsearch로만 해결할 수 있도록 하기 위해, Logstash 대신 Elasticsearch의 Ingest 기능을 사용한다.

With all this in place, we will be able to visualize the data and answer some questions such as “Where can we find a station with Elevators?”, “Where are most of the Stations located?”, “Which is the most dense area?” among others. Our data will come from a text file, and will turn into insights.

이 모든 작업을 통해, data를 시각화하고, "어느 역에 엘리베이터가 있는가?", "역이 가장 많이 위치한 곳은 어디인가?", "가장 밀집한 지역은 어디인가?" 등의 질문에 답할 수 있다. data는 text file에서 가져온 것으로 시각적으로 볼 수 있게 된다.

Cluster Setup

To start, we are going to use a small Elastic Cloud cluster with 2GB of Memory and 48GB of Disk. We will download the CSV file with this data from the Export to CSV feature included in the following website: https://data.ny.gov/NYC-Transit-Subway.

우선, 2GB memory와 4GB disk를 가진 조그마한 Elastic Cloud를 사용한다. 다음 website(https://data.ny.gov/NYC-Transit-Subway)에 있는 CSV export 기능에서 이 data를 CSV로 내려받는다.

We will use a Linux script that is composed with a simple loop to iterate through the CSV lines and send it to our cluster on Elastic Cloud. Elastic Cloud will give us an endpoint for our Elasticsearch instance. In addition to this, we need to enable Kibana to use the Developer tools and also to build the Dashboard.

간단한 loop로 구성된 linux script를 사용하여 CSV line을 반복하고, Elastic Cloud의 cluster로 보낸다. Elastic Cloud가 Elasticsearch instance의 endpoint이다. 이 외에도 Kibana에서 Developer tool을 사용하고 Dashboard를 구성할 수 있도록 해야 한다.

In order to be able to parse the file, we need to replace all double quotes with single quotes and delete the first line of the file (header) before processing it. This can be done with your prefered tool. Each entry should look like (please note the single quotes):

file을 parse하기 위하여, 처리하기 전에, 모든 double quote를 single quote로 바꿔야 하고, file의 첫번째 line(header)를 제거해야 한다. 이것은 선호하는 tool로 하면 된다. 각 항목은 다음과 같다.(single quote에 유의하자)

BMT,4 Avenue,25th St,40.660397,-73.998091,R,,,,,,,,,,,Stair,YES,,YES,FULL,,FALSE,,FALSE,4th Ave,25th St,SE,40.660323,-73.997952,'(40.660397, -73.998091)','(40.660323, -73.997952)'

Parsing to json

In order to be able to search and build dashboards, we need to parse the plain text into a structured json. For this, we will send the data to elasticsearch using the following script:

dashboard를 검색하고 구성하기 위해서, plain text를 json 구조로 parse해야 한다. 이를 위해, 다음 script를 사용하여 Elasticsearch로 data를 전송한다.

while read f1
do        
   curl -XPOST 'https://XXXXXXX.us-east-1.aws.found.io:9243/subway_info_v1/station' -H "Content-Type: application/json" -u elastic:XXXX -d "{ \"station\": \"$f1\" }"
done < NYC_Transit_Subway_Entrance_And_Exit_Data.csv

This script will read the file (named NYC_Transit_Subway_Entrance_And_Exit_Data.csv), line by line, and send the following initial json to Elasticsearch:

이 script는 file(NYC_Transit_Subway_Entrance_And_Exit_Data.csv)을 line 별로 읽어, 다음과 같은 json을 Elasticsearch로 보낸다.

{
   "station": "BMT,4 Avenue,59th St,40.641362,-74.017881,N,R,,,,,,,,,,Stair,YES,,YES,NONE,,FALSE,,TRUE,4th Ave,60th St,SW,40.640682,-74.018857,'(40.641362, -74.017881)','(40.640682, -74.018857)'"
}

The json contains just one field “station”, with a single line. Once the json is sent to Elasticsearch, we need to take the information and break the station field into multiple fields, each containing a single value of the unstructured line. It is highly recommended to use the simulate API from Elasticsearch to play and develop the pipeline before actually creating it. Initially you should just start with a document and an empty pipeline:

json에는 한 line으로 이루어진 "station" field 하나만 있다. 일단 json이 Elasticsearch로 보내지면, 정보를 가져외 station field를 여러 field로 나누어야 한다. 각 field는 구조화되지 않은 line의 단일 값을 가진다. 실제로 pipeline을 생성하기 전에 개발과 실행을 위해 Elasticsearch의 simulate API 를 사용하는 것이 좋다. 처음에는 document와 빈 pipeline으로 시작해야 한다.

POST _ingest/pipeline/_simulate { "pipeline": {}, "docs": [ { "station": "BMT,4 Avenue,59th St,40.641362,-74.017881,N,R,,,,,,,,,,Stair,YES,,YES,NONE,,FALSE,,TRUE,4th Ave,60th St,SW,40.640682,-74.018857,'(40.641362, -74.017881)','(40.640682, -74.018857)'" } ] }

There are many processors available to process the lines, so you should consider all of them to choose which to use. In this case we will simply use the Grok Processor which allows us to easily define a simple pattern for our lines. The idea of the following processor is to parse using grok and finally remove the field containing the full line:

line을 처리할 수 있는 많은 processor가 있으므로, 사용하려는 모든 processor를 고려해야 한다. 이 경우에는 Grok Processor 를 사용하여 line에 대한 간단한 패턴을 쉽게 정의할 수 있다. 다음 processor의 개념은 grok를 사용하여 parse하고마지막으로 전체 line을 가지는 field를 제거하는 것이다.

POST _ingest/pipeline/_simulate
{
 "pipeline": {
   "description": "Parsing the NYC stations",
   "processors": [
     {
       "grok": {
         "field": "station",
         "patterns": [
           "%{WORD:division},%{DATA:line},%{DATA:station_name},%{NUMBER:location.lat},%{NUMBER:location.lon},%{DATA},%{DATA},%{DATA},%{DATA},%{DATA},%{DATA},%{DATA},%{DATA},%{DATA},%{DATA},%{DATA},%{DATA:entrance_type},%{DATA:entry},%{DATA:exit_only},%{DATA:vending}"
         ]
       }
     },
     {
       "remove": {
         "field": "station"
       }
     }
   ]
 },
 "docs": [
   {
     "_index": "subway_info",
     "_type": "station",
     "_id": "AVvJZVQEBr2flFKzrrkr",
     "_score": 1,
     "_source": {
       "station": "BMT,4 Avenue,53rd St,40.645069,-74.014034,R,,,,,,,,,,,Stair,NO,Yes,NO,NONE,,FALSE,,TRUE,4th Ave,52nd St,NW,40.645619,-74.013688,'(40.645069, -74.014034)','(40.645619, -74.013688)'"
     }
   }
 ]
}
Read Less

This pipeline will produce the following document with structured fields and ready to be indexed into Elasticsearch:

이 pipeline은 구조화된 field를 가지는 다음과 같은 document를 생성하고, Elasticsearch에 index할 준비가 된다.

{
 "division": "BMT",
 "station_name": "53rd St",
 "entry": "NO",
 "exit_only": "Yes",
 "line": "4 Avenue",
 "vending": "",
 "location": {
   "lon": "-74.014034",
   "lat": "40.645069"
 },
 "entrance_type": "Stair"
}

Indexing the documents

Before indexing the document, we need to create an index template that will match the index name that we are going to use. In order to properly be able to do document filtering and geo-location styled queries and dashboard, it is required to setup certain field types which require a specific mapping definition. This can be done by explicitly creating the index in advance, but it is better to use an index template to make it flexible. New indices following this name will be created, automatically, with all this settings and mappings. The name of the index that we will use in this case, will start with subway_info followed by the version (v1 for example). For this, we should use the following template that will match such index name:

document를 index하기 전에, 사용할 index 이름과 일치하는 index template을 생성해야 한다. document filtering과 geo-location 형식의 query와 dashboard를 제대로 수행하려면, 특정한 mapping 정의가 필요한 특정 field type을 설정해야 한다. 이 작업은 index를 명시적으로 미리 생성하여 수행할 수도 있지만, index template을 사용하여 유연하게 하는 것이 더 좋다. 이 모든 설정과 mapping을 가지고 이 이름으로 시작하는 새로운 index가 자동으로 생성된다. 이 경우에 사용되는 index의 이름은 subway_info 다음에 version(예: v1)이 온다. 이를 위해, 그런 index 이름과 일치하는 다음과 같은 template을 사용해야 한다.

PUT _template/nyc_template
{
 "template": "subway_info*",
 "settings": {
   "number_of_shards": 1
 },
 "mappings": {
   "station": {
     "properties": {
       "location": {
         "type": "geo_point"
       },
       "entry": {
         "type": "keyword"
       },
       "exit_only": {
         "type": "keyword"
       }
     }
   }
 }
}

After creating the template, we need to take the ingest pipeline from the simulate step and put the pipeline itself into Elasticsearch, so that we can invoke it at indexing time. The command to put the ingest pipeline should look like the following:

template을 생성한 후에는, simulate 단계에서 ingest pipeline을 가져와, pipeline 자체를 Elasticsearch에 넣어야, index시에 호출할 수 있다. ingest pipeline을 넣는 command는 다음과 같다.

PUT _ingest/pipeline/parse_nyc_csv
{
 "description": "Parsing the NYC stations",
 "processors": [
   {
     "grok": {
       "field": "station",
       "patterns": [
         "%{WORD:division},%{DATA:line},%{DATA:station_name},%{NUMBER:location.lat},%{NUMBER:location.lon},%{DATA},%{DATA},%{DATA},%{DATA},%{DATA},%{DATA},%{DATA},%{DATA},%{DATA},%{DATA},%{DATA},%{DATA:entrance_type},%{DATA:entry},%{DATA:exit_only},%{DATA:vending}"
       ]
     }
   },
   {
     "remove": {
       "field": "station"
     }
   }
 ]
}

Now, we just need to read the file and sent the data to Elasticsearch using the  pipeline we created. The pipeline definition is configured in the URL:

이제, file을 읽고, 생성한 pipeline을 사용하여 Elasticsearch로 data를 보내야 한다. pipeline 정의는 URL에 구성된다.

while read f1
do        
  curl -XPOST 'https://XXXXXXX.us-east-1.aws.found.io:9243/subway_info_v1/station?pipeline=parse_nyc_csv' -H "Content-Type: application/json" -u elastic:XXXX -d "{ \"station\": \"$f1\" }"  
done < NYC_Transit_Subway_Entrance_And_Exit_Data.csv

Let it run for a while and it will create each document one at a time. Note that here we are using the index API and not the bulk API. In order to make it faster and more robust for production use cases, we recommend you use the bulk API to index these documents. At the end of the ingest process you will end up with 1868 stations in the subway_info_v1 index.

잠시 동안 실행해 보면, 한 번에 하나씩 각 document가 생성된다. 여기에서는 bulk API가 아닌 index API를 사용한다는 점에 유의하자. 제품에서 사용하는 경우에는 더 빠르고 강력하게 만들기 위해, 이들 document를 index하기 위해 bulk API를 사용하는 것이 좋다. ingest process가 끝나면, subway_info_v1 index에 1868개의 역이 나온다.

Building the Dashboard

In order to build the dashboard, first we need to add the index pattern to Kibana. For that, just go to Management and add the index pattern subway_info_v1. You should unclick the “Index contains time-based events” option as it is not time series data (our data doesn’t contain a date-time field).

dashboard를 만들기 위해, 먼저 Kibana에서 index pattern을 추가해야 한다. 이를 위해, Management로 가서, index pattern에 subway_info_v1을 추가하자. 시계열 data가 아니기 때문(data에 날짜/시간 field가 포함되어 있지 않다)에 “Index contains time-based events” option을 해제해야 한다.

img1.png

After this we can go and first create our Tile Map visualization showing all the subway stations that we have in this dataset for New York City. For that, we need to go to Visualizations and choose the Tile Map type. By choosing the geolocation field and the cardinality of the station name, we get an easy and quick view of the existing stations.

그 후에, 이 data 집합에 있는 뉴욕시의 모든 지하철 역을 보여주는 Tile Map 시각화를 만들 수 있다. 이를 위해, Visualizations로 가서, Tile Map 유형을 선택한다. geolocation field와 역 이름의 cardinality를 선택함으로써, 기존의 역을 쉽고 빠르게 볼 수 있다.

img2.png

By adding some additional Visualizations, such as a Saved Search and type of entrance, we can easily build a tool to search for specific Subway stations in New York city.

Saved Search와 입구 유형 같은 몇 가지 추가적인 Visualizations를 추가함으로써, 뉴욕시의 특정 지하철 역을 search하는 tool을 쉽게 만들 수 있다.

img3.png

As we can see here, downtown Manhattan is the area with the most Subway Stations. With Kibana you can select a specific rectangle and also filter on a value such as “Elevator”. For example from the 356 Stations, around 200 are located in Manhattan, and 36 stations have elevators.

여기서 볼 수 있듯이, Manhattan 시내는 가장 많은 지하철역이 있는 지역이다. Kibana를 사용하면, 특정 사각형을 선택할 수 있고 "Elevator" 같은 값으로 filtering할 수 있다. 예를 들어, 356개 역에서, 약 200개 정도가 Manhattan에 있고 36개의 역이 elevator를 가지고 있다.

img4.png

Try it out with your Dataset!

As you can see, starting from scratch with a CSV file is very simple. You can run a trial on an Elasticsearch Cloud (cloud.elastic.co) instance and ingest your data with just little work. Ingest Node is the tool that will allow you to convert unstructured data into structured data, and finally be able to create powerful visualizations with Kibana. Going from Zero to Hero in Elasticsearch, is as easy as spinning up a cluster. Try it for yourself!

보시다시피, CSV 파일로 처음부터 시작하는 것은 매우 간단하자. Elasticsearch Cloud() instance에서 평가판을 실행하고, 약간의 작업으로 data를 ingest할 수 있다. Ingest Node는 구조화되지 않은 data를 구조화된 data로 변경하고, 마지막으로 Kibana를 사용하여 강력한 시각화를 만들 수 있는 tool이다. Elasticsearch에서 처음부터 끝까지 하는 것은 매우 쉽다. 직접 해 보자.

원문 : Indexing your CSV files with Elasticsearch Ingest Node