Skip to content

Guide to Reindexing ElasticSearch data input with Logstash

I ran into an issue where I set up logstash to load data that was numeric as a string. Then, later on when we wanted to do visualizations with it, they were off. So, I needed to re-index all the data.

Total pain, hope this guide helps.  (Here’s some additional elastic search documentation: here and here.)

If you don’t care about your old data, just:

  • shut down logstash
  • deploy the new logstash filter (with mutates)
  • close all old indices
  • turn on logstash
  • send some data through to logstash
  • refresh fields in kibana–you’ll lose popularity

Now, if you do care about your old data, well, that’s a different story. Here are the steps I took:

First, modify the new logstash filter file, using mutate and deploy it. This takes care of the logstash indexes going forward, but will cause some kibana pain until you convert all the past indexes (because some indexes will have fields as strings and others as numbers).

Install jq: https://stedolan.github.io/jq/manual/v1.4/ which will help you transform your data (jq is magic, I tell you).

Then, for each day/index you care about (logstash-2015.09.22in this example ), you want to follow these steps.


# get the current mapping
curl -XGET 'http://localhost:9200/logstash-2015.09.22/_mapping?pretty=1' > mapping

#back it up
cp mapping mapping.old

# edit mapping, change the types of the fields that are strings to long, float, or boolean.  I used vi

# create a new index with the new mapping 
curl -XPUT 'http://localhost:9200/logstash-2015.09.22-new/' -d @mapping

# find out how many rows there are.  If there are too many, you may want to use the scrolled search.  
# I handled indexes as big as 500k documents with the below approach
curl -XGET 'localhost:9200/logstash-2015.09.22/_count'

# if you are modifying an old index, no need to stop logstash, but if you are modifying an index with data currently going to it, you need to stop logstash at this step.

# change size below to be bigger than the count.
curl -XGET 'localhost:9200/logstash-2015.09.22/_search?size=250000'> logstash-2015.09.22.data.orig

# edit data, just get the array of docs without the metadata
sed 's/^[^[]*\[/[/' logstash-2015.09.22.data.orig |sed 's/..$//' > logstash-2015.09.22.data

# run jq to build a bulk insert compatible json file ( https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html )

# make sure to correct the _index value. in the line below
jq -f jq.file logstash-2015.09.22.data |jq -c '\
{ index: { _index: "logstash-2015.09.22-new", _type: "logs" } },\
.' > toinsert

# where jq.file is the file below

# post the toinsert file to the new index
curl -s -XPOST localhost:9200/_bulk --data-binary "@toinsert"; echo

# NOTE: depending on the size of the toinsert file, you may need to split it up into multiple files using head and tail.  
# Make sure you don't split the metadata and data line (that is, each file should have an even number of lines), 
# and that files are all less than 1GB in size.

# delete the old index
curl -XDELETE 'http://localhost:9200/logstash-2015.09.22'

# add a new alias with the old index's name and pointing to the new index
curl -XPOST localhost:9200/_aliases -d '
{
   "actions": [
       { "add": {
           "alias": "logstash-2015.09.22",
           "index": "logstash-2015.09.22-new"
       }}
   ]
}'

# restart logstash if you stopped it above.
sudo service logstash restart

# refresh fields in kibana--you'll lose popularity

Here’s the jq file which converts specified string fields to numeric and boolean fields.


#
# this is run with the jq tool for parsing and modifying json

# from https://github.com/stedolan/jq/issues/670
def translate_key(from;to):
  if type == "object" then . as $in
     | reduce keys[] as $key
         ( {};
       . + { (if $key == from then to else $key end)
             : $in[$key] | translate_key(from;to) } )
  elif type == "array" then map( translate_key(from;to) )
  else .
  end;

def turn_to_number(from):
  if type == "object" then . as $in
     | reduce keys[] as $key
         ( {};
       . + { ($key )
             : ( if $key == from then ($in[$key] | tonumber) else $in[$key] end ) } )
  else .
  end;

def turn_to_boolean(from):
  if type == "object" then . as $in
     | reduce keys[] as $key
         ( {};
       . + { ($key )
             : ( if $key == from then (if $in[$key] == "true" then true else false end ) else $in[$key] end ) } )
  else .
  end;

# for example, this converts any values with this field to numbers, and outputs the rest of the object unchanged
# run with jq -c -f  
.[]|._source| turn_to_number("numfield")

Rinse, wash, repeat.

Kibana Visualizations that Change With Browser Reload

I ran into a weird problem with Kibana recently.  We are using the ELK stack to ingest some logs and do some analysis, and when the Kibana webapp was reloaded, it showed different results for certain visualizations, especially averages.  Not all of them, and the results were always close to the actual value, but when you see 4.6 one time and 4.35 two seconds later on a system under light load and for the exact same metric, it doesn’t inspire confidence in your analytics system.

I dove into the issue.  By using Chrome Webtools, I noticed that the visualizations that were most squirrely were loaded last.  That made me suspicious that there was some failure causing missing data, which caused the average to change. However, the browser API calls weren’t failing, they were succeeding.

I first looked in the Elastic and Kibana configuration files to see if there was any easy timeout configuration values that I was missing.  But I didn’t see any.

I then tried to narrow down the issue.  When it was originally noted, we had about 15 visualizations working on about a months worth of data.  After a fair bit of URL manipulation, I determined that the discrepancies appeared regularly when there were about 10 visualizations, or when I cut the data down to four hours worth.  This gave me more confidence in my theory that some kind of timeout or other resource constraint was the issue. But where was the issue?

I then looked in the ElasticSearch logs.  We have a mapping issue, related to a scripted field and outlined here, which caused a lot of white noise, but I did end up seeing an exception:


org.elasticsearch.common.util.concurrent.EsRejectedExecutionException: rejected execution (queue capacity 1000) on org.elasticsearch.search.action.SearchServiceTransportAction$23@3c26b1f5
        at org.elasticsearch.common.util.concurrent.EsAbortPolicy.rejectedExecution(EsAbortPolicy.java:62)
        at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
        at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
        at org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor.execute(EsThreadPoolExecutor.java:79)
        at org.elasticsearch.search.action.SearchServiceTransportAction.execute(SearchServiceTransportAction.java:551)
        at org.elasticsearch.search.action.SearchServiceTransportAction.sendExecuteQuery(SearchServiceTransportAction.java:228)
        at org.elasticsearch.action.search.type.TransportSearchCountAction$AsyncAction.sendExecuteFirstPhase(TransportSearchCountAction.java:71)
        at org.elasticsearch.action.search.type.TransportSearchTypeAction$BaseAsyncAction.performFirstPhase(TransportSearchTypeAction.java:176)
        at org.elasticsearch.action.search.type.TransportSearchTypeAction$BaseAsyncAction.start(TransportSearchTypeAction.java:158)
        at org.elasticsearch.action.search.type.TransportSearchCountAction.doExecute(TransportSearchCountAction.java:55)
        at org.elasticsearch.action.search.type.TransportSearchCountAction.doExecute(TransportSearchCountAction.java:45)
        at org.elasticsearch.action.support.TransportAction.execute(TransportAction.java:75)
        at org.elasticsearch.action.search.TransportSearchAction.doExecute(TransportSearchAction.java:108)
        at org.elasticsearch.action.search.TransportSearchAction.doExecute(TransportSearchAction.java:43)
        at org.elasticsearch.action.support.TransportAction.execute(TransportAction.java:75)
        at org.elasticsearch.action.search.TransportMultiSearchAction.doExecute(TransportMultiSearchAction.java:62)
        at org.elasticsearch.action.search.TransportMultiSearchAction.doExecute(TransportMultiSearchAction.java:39)
        at org.elasticsearch.action.support.TransportAction.execute(TransportAction.java:75)
        at org.elasticsearch.client.node.NodeClient.execute(NodeClient.java:98)
        at org.elasticsearch.client.FilterClient.execute(FilterClient.java:66)
        at org.elasticsearch.rest.BaseRestHandler$HeadersAndContextCopyClient.execute(BaseRestHandler.java:92)
        at org.elasticsearch.client.support.AbstractClient.multiSearch(AbstractClient.java:364)
        at org.elasticsearch.rest.action.search.RestMultiSearchAction.handleRequest(RestMultiSearchAction.java:66)
        at org.elasticsearch.rest.BaseRestHandler.handleRequest(BaseRestHandler.java:53)
        at org.elasticsearch.rest.RestController.executeHandler(RestController.java:225)
        at org.elasticsearch.rest.RestController.dispatchRequest(RestController.java:170)
        at org.elasticsearch.http.HttpServer.internalDispatchRequest(HttpServer.java:121)
        at org.elasticsearch.http.HttpServer$Dispatcher.dispatchRequest(HttpServer.java:83)
        at org.elasticsearch.http.netty.NettyHttpServerTransport.dispatchRequest(NettyHttpServerTransport.java:329)
        at org.elasticsearch.http.netty.HttpRequestHandler.messageReceived(HttpRequestHandler.java:63)
        at org.elasticsearch.common.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
        at org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
        at org.elasticsearch.common.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
        at org.elasticsearch.http.netty.pipelining.HttpPipeliningHandler.messageReceived(HttpPipeliningHandler.java:60)
        at org.elasticsearch.common.netty.channel.SimpleChannelHandler.handleUpstream(SimpleChannelHandler.java:88)
        at org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
        at org.elasticsearch.common.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
        at org.elasticsearch.common.netty.handler.codec.http.HttpChunkAggregator.messageReceived(HttpChunkAggregator.java:145)
        at org.elasticsearch.common.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
        at org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
        at org.elasticsearch.common.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
        at org.elasticsearch.common.netty.handler.codec.http.HttpContentDecoder.messageReceived(HttpContentDecoder.java:108)
        at org.elasticsearch.common.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
        at org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
        at org.elasticsearch.common.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
        at org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channels.java:296)
        at org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:459)
        at org.elasticsearch.common.netty.handler.codec.replay.ReplayingDecoder.callDecode(ReplayingDecoder.java:536)
        at org.elasticsearch.common.netty.handler.codec.replay.ReplayingDecoder.messageReceived(ReplayingDecoder.java:435)
        at org.elasticsearch.common.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
        at org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
        at org.elasticsearch.common.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
        at org.elasticsearch.common.netty.OpenChannelsHandler.handleUpstream(OpenChannelsHandler.java:74)
        at org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
        at org.elasticsearch.common.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
        at org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channels.java:268)
        at org.elasticsearch.common.netty.channel.Channels.fireMessageReceived(Channels.java:255)
        at org.elasticsearch.common.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)
        at org.elasticsearch.common.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108)
        at org.elasticsearch.common.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:337)
        at org.elasticsearch.common.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)

Which led me to this StackOverflow post.  Which led me to run this command on my ES instance:


$ curl -XGET localhost:9200/_cat/thread_pool?v
host            ip           bulk.active bulk.queue bulk.rejected index.active index.queue index.rejected search.active search.queue search.rejected
ip-10-253-44-49 10.253.44.49           0          0             0            0           0              0             0            0               0
ip-10-253-44-49 10.253.44.49           0          0             0            0           0              0             0            0           31589

And as I ran that command repeatedly, I saw the search.rejected number getting larger and larger. Clearly I had a misconfiguration/limit around my search thread pool. After looking at the CPU and memory and i/o on the box, I could tell it wasn’t stressed, so I decided to increase the queue size for this pool. (I thought briefly about modifying the search thread pool size, but this article warned me off.)

This GH issue helped me understand how to modify the threadpool briefly so I could test the theory.

After making this configuration change, search.rejected went to zero, and the visualization aberrations disappeared. I will modify the elasticsearch.yaml file to make this persist across server restarts and re-provisions, but for now, the issue seems to be addressed.