Kuinka siirtyä Elasticsearch 1.7: stä 6.8: een ilman seisokkeja

Viimeinen tehtäväni BigPandalla oli päivittää olemassa oleva palvelu, joka käytti Elasticsearch-versiota 1.7 uudempaan Elasticsearch-versioon, 6.8.1.

Tässä viestissä jaan kuinka siirrymme Elasticsearch 1.6: sta 6.8: een ankarilla rajoituksilla, kuten nolla seisokilla, ei tietojen menetyksiä ja nolla virheitä. Annan sinulle myös käsikirjoituksen, joka suorittaa siirron puolestasi.

Tämä viesti sisältää 6 lukua (ja yksi on valinnainen):

  • Mitä minä siitä hyödyn? -> Mitkä olivat uudet ominaisuudet, jotka saivat meidät päivittämään versiomme?
  • Rajoitukset -> Mitkä olivat liiketoimintavaatimuksemme?
  • Ongelmanratkaisu -> Kuinka käsittelimme rajoituksia?
  • Siirtyminen eteenpäin -> Suunnitelma.
  • [Valinnainen luku] -> Kuinka käsitelimme surullisen kartoitusräjähdysongelman?
  • Lopuksi -> Kuinka tehdä tiedonsiirto klustereiden välillä.

Luku 1 - Mitä se minulle tarjoaa?

Mitä hyötyjä odotimme ratkaistavan päivittämällä tietovarastomme?

Siellä oli pari syytä:

  1. Suorituskykyyn ja vakauteen liittyvät ongelmat - Meillä oli valtava määrä seisokkeja pitkällä MTTR: ​​llä, mikä aiheutti meille paljon päänsärkyä. Tämä heijastui usein korkeisiin viiveisiin, korkeaan suorittimen käyttöön ja muihin ongelmiin.
  2. Vanhoissa Elasticsearch-versioissa olematon tuki - Elasticsearchista puuttui operatiivista tietoa, ja kun etsimme ulkopuolista konsultointia, meitä kannustettiin siirtymään eteenpäin saadaksemme tukea.
  3. Dynaamiset kartoitukset skeemassamme - Nykyinen skeema Elasticsearch 1.7 -ohjelmassa käytti ominaisuutta, jota kutsutaan dynaamiseksi kartoitukseksi, joka sai klusterimme räjähtämään useita kertoja. Joten halusimme käsitellä tätä ongelmaa.
  4. Huono näkyvyys nykyiselle klusterillemme - Halusimme paremman kuvan konepellin alle ja huomasimme, että myöhemmissä versioissa oli hyvät mittareita vievät työkalut.

Luku 2 - Rajoitukset

  • NOLLA seisonta-ajan siirtyminen - Järjestelmässämme on aktiivisia käyttäjiä, eikä meillä ole varaa järjestelmän toimintahäiriöön siirtymisen aikana.
  • Palautumissuunnitelma - Meillä ei ole varaa tietojen "katoamiseen" tai "vioittamiseen", kustannuksista riippumatta. Joten meidän oli laadittava elvytyssuunnitelma siltä varalta, että siirtymämme epäonnistui.
  • Zero bugs - Emme voineet muuttaa olemassa olevaa hakutoimintoa loppukäyttäjille.

Luku 3 - Ongelmanratkaisu ja suunnitelman ajattelu

Käsitellään rajoituksia yksinkertaisimmista vaikeimpiin:

Zero bugeja

Tämän vaatimuksen täyttämiseksi tutkin kaikkia palvelun mahdollisia pyyntöjä ja mitkä olivat sen tuotokset. Sitten lisäsin yksikötestit tarvittaessa.

Lisäksi lisäsin useita mittareita ( Elasticsearch Indexerja new Elasticsearch Indexer) latenssin, läpimenon ja suorituskyvyn seuraamiseksi, mikä antoi minulle mahdollisuuden vahvistaa, että parannimme niitä vain.

Toipumissuunnitelma

Tämä tarkoittaa, että minun piti käsitellä seuraavaa tilannetta: Asensin uuden koodin tuotantoon ja tavarat eivät toimineet odotetulla tavalla. Mitä voin tehdä asialle sitten

Koska työskentelin palvelussa, joka käytti tapahtumien hankintaa, voisin lisätä toisen kuuntelijan (alla oleva kaavio) ja alkaa kirjoittaa uuteen Elasticsearch-klusteriin vaikuttamatta tuotantotilaan

Nolla seisokkien siirtyminen

Nykyinen palvelu on live-tilassa, eikä sitä voida deaktivoida yli 5–10 minuutin jaksoille. Temppu tämän oikeuden saamiseksi on tämä:

  • Tallenna loki kaikista toiminnoista, joita palvelusi käsittelee (käytämme Kafkaa tuotannossa)
  • Aloita siirtoprosessi offline-tilassa (ja seuraa siirtymää ennen siirtämisen aloittamista)
  • Kun siirto on päättynyt, käynnistä uusi palvelu lokia vastaan ​​tallennetulla siirtymällä ja ota viive kiinni
  • Kun viive on päättynyt, vaihda käyttöliittymä kyselemään uutta palvelua ja olet valmis

Luku 4 - Suunnitelma

Nykyinen palvelumme käyttää seuraavaa arkkitehtuuria (perustuu Kafkan viestien välitykseen):

  1. Event topicsisältää muiden sovellusten tuottamia tapahtumia (esimerkiksi UserId 3 created)
  2. Command topicsisältää käännöksen näiden tapahtumien erityisiksi komentoja käytetään tämän sovelluksen (esimerkiksi: Add userId 3)
  3. Elasticsearch 1.7 - Tietolevyn command Topiclukema Elasticsearch Indexer.

Aiomme lisätä toisen kuluttajan ( new Elasticsearch Indexer) command topic, joka lukee samat tarkat viestit ja kirjoittaa ne rinnakkain Elasticsearch 6.8: n kanssa.

Mistä minun pitäisi aloittaa?

Ollakseni rehellinen, pidin itseäni aloittelijan Elasticsearch-käyttäjänä. Jotta voisin luottaa tämän tehtävän suorittamiseen, minun piti miettiä paras tapa lähestyä tätä aihetta ja oppia se. Muutama asia, joka auttoi, olivat:

  1. Dokumentaatio - Se on mielettömän hyödyllinen resurssi kaikkeen Elasticsearchiin. Käytä aikaa lukemiseen ja tee muistiinpanoja (älä missaa: Mapping ja QueryDsl).
  2. HTTP-sovellusliittymä - kaikki CAT-sovellusliittymän alla. Tämä oli erittäin hyödyllistä virheenkorjaamiseksi asioissa paikallisesti ja nähdäksesi, kuinka Elasticsearch reagoi (älä missaa: klusterin terveys, kissan indeksit, haku, poista hakemisto).
  3. Metrics (❤️) - Ensimmäisestä päivästä alkaen määritimme uuden kiiltävän kojelaudan, jossa oli paljon hienoja mittareita (otettu elasticsearch-exporter-for-Prometheusilta ), jotka auttoivat ja saivat meidät ymmärtämään enemmän Elasticsearchista.

Koodi

Koodikanta käytti kirjastoa nimeltä elast4s ja käytti kirjaston vanhinta julkaisua - todella hyvä syy siirtymiseen! Joten ensimmäinen asia oli vain siirtää versiot ja nähdä, mikä hajosi.

On olemassa muutama taktiikka tämän koodin siirron suorittamiseksi. Valitsemamme taktiikka oli yrittää palauttaa nykyiset toiminnot ensin uudessa Elasticsearch-versiossa kirjoittamatta kaikkia koodeja alusta alkaen. Toisin sanoen nykyisten toimintojen saavuttamiseksi, mutta uudemmalla Elasticsearch-versiolla.

Meille onneksi koodi sisälsi jo melkein täydellisen testauksen kattavuuden, joten tehtävämme oli paljon yksinkertaisempi, ja se kesti noin 2 viikkoa kehitysaikaa.

On tärkeää huomata, että jos näin ei olisi, meidän olisi pitänyt investoida jonkin aikaa kattavuuden täyttämiseen. Vasta sitten pystyisimme siirtymään, koska yksi rajoituksistamme oli olla rikkomatta olemassa olevia toimintoja.

Luku 5 - Kartoitus räjähdysongelma

Kuvailkaamme käyttötapaustamme tarkemmin. Tämä on mallimme:

class InsertMessageCommand(tags: Map[String,String])

Ja esimerkiksi tämän viestin esiintymä olisi:

new InsertMessageCommand(Map("name"->"dor","lastName"->"sever"))

Ja tämän mallin vuoksi meidän täytyi tukea seuraavia kyselyvaatimuksia:

  1. Kysely arvon mukaan
  2. Kysely tagin nimen ja arvon mukaan

Tapa, jolla tämä mallinnettiin Elasticsearch 1.7 -mallissamme, käytti dynaamista mallimallia (koska tagiavaimet ovat dynaamisia, eikä niitä voi mallintaa edistyneissä).

The dynamic template caused us multiple outages due to the mapping explosion problem, and the schema looked like this:

curl -X PUT "localhost:9200/_template/my_template?pretty" -H 'Content-Type: application/json' -d ' { "index_patterns": [ "your-index-names*" ], "mappings": { "_doc": { "dynamic_templates": [ { "tags": { "mapping": { "type": "text" }, "path_match": "actions.tags.*" } } ] } }, "aliases": {} }' curl -X PUT "localhost:9200/your-index-names-1/_doc/1?pretty" -H 'Content-Type: application/json' -d' { "actions": { "tags" : { "name": "John", "lname" : "Smith" } } } ' curl -X PUT "localhost:9200/your-index-names-1/_doc/2?pretty" -H 'Content-Type: application/json' -d' { "actions": { "tags" : { "name": "Dor", "lname" : "Sever" } } } ' curl -X PUT "localhost:9200/your-index-names-1/_doc/3?pretty" -H 'Content-Type: application/json' -d' { "actions": { "tags" : { "name": "AnotherName", "lname" : "AnotherLastName" } } } ' 
 curl -X GET "localhost:9200/_search?pretty" -H 'Content-Type: application/json' -d' { "query": { "match" : { "actions.tags.name" : { "query" : "John" } } } } ' # returns 1 match(doc 1) curl -X GET "localhost:9200/_search?pretty" -H 'Content-Type: application/json' -d' { "query": { "match" : { "actions.tags.lname" : { "query" : "John" } } } } ' # returns zero matches # search by value curl -X GET "localhost:9200/_search?pretty" -H 'Content-Type: application/json' -d' { "query": { "query_string" : { "fields": ["actions.tags.*" ], "query" : "Dor" } } } ' 

Nested documents solution

Our first instinct in solving the mapping explosion problem was to use nested documents.

We read the nested data type tutorial in the Elastic docs and defined the following schema and queries:

curl -X PUT "localhost:9200/my_index?pretty" -H 'Content-Type: application/json' -d' { "mappings": { "_doc": { "properties": { "tags": { "type": "nested" } } } } } ' curl -X PUT "localhost:9200/my_index/_doc/1?pretty" -H 'Content-Type: application/json' -d' { "tags" : [ { "key" : "John", "value" : "Smith" }, { "key" : "Alice", "value" : "White" } ] } ' # Query by tag key and value curl -X GET "localhost:9200/my_index/_search?pretty" -H 'Content-Type: application/json' -d' { "query": { "nested": { "path": "tags", "query": { "bool": { "must": [ { "match": { "tags.key": "Alice" }}, { "match": { "tags.value": "White" }} ] } } } } } ' # Returns 1 document curl -X GET "localhost:9200/my_index/_search?pretty" -H 'Content-Type: application/json' -d' { "query": { "nested": { "path": "tags", "query": { "bool": { "must": [ { "match": { "tags.value": "Smith" }} ] } } } } } ' # Query by tag value # Returns 1 result 

And this solution worked. However, when we tried to insert real customer data we saw that the number of documents in our index increased by around 500 times.

We thought about the following problems and went on to find a better solution:

  1. The amount of documents we had in our cluster was around 500 million documents. This meant that, with the new schema, we were going to reach two hundred fifty billion documents (that’s 250,000,000,000 documents ?).
  2. We read this really good blog post — //blog.gojekengineering.com/elasticsearch-the-trouble-with-nested-documents-e97b33b46194 which highlights that nested documents can cause high latency in queries and heap usage problems.
  3. Testing — Since we were converting 1 document in the old cluster to an unknown number of documents in the new cluster, it would be much harder to track if the migration process worked without any data loss. If our conversion was 1:1, we could assert that the count in the old cluster equalled the count in the new cluster.

Avoiding nested documents

The real trick in this was to focus on what supported queries we were running: search by tag value, and search by tag key and value.

The first query does not require nested documents since it works on a single field. For the latter, we did the following trick. We created a field that contains the combination of the key and the value. Whenever a user queries on a key, value match, we translate their request to the corresponding text and query against that field.

Example:

curl -X PUT "localhost:9200/my_index_2?pretty" -H 'Content-Type: application/json' -d' { "mappings": { "_doc": { "properties": { "tags": { "type": "object", "properties": { "keyToValue": { "type": "keyword" }, "value": { "type": "keyword" } } } } } } } ' curl -X PUT "localhost:9200/my_index_2/_doc/1?pretty" -H 'Content-Type: application/json' -d' { "tags" : [ { "keyToValue" : "John:Smith", "value" : "Smith" }, { "keyToValue" : "Alice:White", "value" : "White" } ] } ' # Query by key,value # User queries for key: Alice, and value : White , we then query elastic with this query: curl -X GET "localhost:9200/my_index_2/_search?pretty" -H 'Content-Type: application/json' -d' { "query": { "bool": { "must": [ { "match": { "tags.keyToValue": "Alice:White" }}] }}} ' # Query by value only curl -X GET "localhost:9200/my_index_2/_search?pretty" -H 'Content-Type: application/json' -d' { "query": { "bool": { "must": [ { "match": { "tags.value": "White" }}] }}} ' 

Chapter 6 — The migration process

We planned to migrate about 500 million documents with zero downtime. To do that we needed:

  1. A strategy on how to transfer data from the old Elastic to the new Elasticsearch
  2. A strategy on how to close the lag between the start of the migration and the end of it

And our two options in closing the lag:

  1. Our messaging system is Kafka based. We could have just taken the current offset before the migration started, and after the migration ended, start consuming from that specific offset. This solution requires some manual tweaking of offsets and some other stuff, but will work.
  2. Another approach to solving this issue was to start consuming messages from the beginning of the topic in Kafka and make our actions on Elasticsearch idempotent — meaning, if the change was “applied” already, nothing would change in Elastic store.

The requests made by our service against Elastic were already idempotent, so we choose option 2 because it required zero manual work (no need to take specific offsets, and then set them afterward in a new consumer group).

How can we migrate the data?

These were the options we thought of:

  1. If our Kafka contained all messages from the beginning of time, we could just play from the start and the end state would be equal. But since we apply retention to out topics, this was not an option.
  2. Dump messages to disk and then ingest them to Elastic directly – This solution looked kind of weird. Why store them in disk instead of just writing them directly to Elastic?
  3. Transfer messages between old Elastic to new Elastic — This meant, writing some sort of “script” (did anyone say Python? ?) that will connect to the old Elasticsearch cluster, query for items, transform them to the new schema, and index them in the cluster.

We choose the last option. These were the design choices we had in mind:

  1. Let’s not try to think about error handling unless we need to. Let’s try to write something super simple, and if errors occur, let’s try to address them. In the end, we did not need to address this issue since no errors occurred during the migration.
  2. It’s a one-off operation, so whatever works first / KISS.
  3. Metrics — Since the migration processes can take hours to days, we wanted the ability from day 1 to be able to monitor the error count and to track the current progress and copy rate of the script.

We thought long and hard and choose Python as our weapon of choice. The final version of the code is below:

dictor==0.1.2 - to copy and transform our Elasticsearch documentselasticsearch==1.9.0 - to connect to "old" Elasticsearchelasticsearch6==6.4.2 - to connect to the "new" Elasticsearchstatsd==3.3.0 - to report metrics 
from elasticsearch import Elasticsearch from elasticsearch6 import Elasticsearch as Elasticsearch6 import sys from elasticsearch.helpers import scan from elasticsearch6.helpers import parallel_bulk import statsd ES_SOURCE = Elasticsearch(sys.argv[1]) ES_TARGET = Elasticsearch6(sys.argv[2]) INDEX_SOURCE = sys.argv[3] INDEX_TARGET = sys.argv[4] QUERY_MATCH_ALL = {"query": {"match_all": {}}} SCAN_SIZE = 1000 SCAN_REQUEST_TIMEOUT = '3m' REQUEST_TIMEOUT = 180 MAX_CHUNK_BYTES = 15 * 1024 * 1024 RAISE_ON_ERROR = False def transform_item(item, index_target): # implement your logic transformation here transformed_source_doc = item.get("_source") return {"_index": index_target, "_type": "_doc", "_id": item['_id'], "_source": transformed_source_doc} def transformedStream(es_source, match_query, index_source, index_target, transform_logic_func): for item in scan(es_source, query=match_query, index=index_source, size=SCAN_SIZE, timeout=SCAN_REQUEST_TIMEOUT): yield transform_logic_func(item, index_target) def index_source_to_target(es_source, es_target, match_query, index_source, index_target, bulk_size, statsd_client, logger, transform_logic_func): ok_count = 0 fail_count = 0 count_response = es_source.count(index=index_source, body=match_query) count_result = count_response['count'] statsd_client.gauge(stat='elastic_migration_document_total_count,index={0},type=success'.format(index_target), value=count_result) with statsd_client.timer('elastic_migration_time_ms,index={0}'.format(index_target)): actions_stream = transformedStream(es_source, match_query, index_source, index_target, transform_logic_func) for (ok, item) in parallel_bulk(es_target, chunk_size=bulk_size, max_chunk_bytes=MAX_CHUNK_BYTES, actions=actions_stream, request_timeout=REQUEST_TIMEOUT, raise_on_error=RAISE_ON_ERROR): if not ok: logger.error("got error on index {} which is : {}".format(index_target, item)) fail_count += 1 statsd_client.incr('elastic_migration_document_count,index={0},type=failure'.format(index_target), 1) else: ok_count += 1 statsd_client.incr('elastic_migration_document_count,index={0},type=success'.format(index_target), 1) return ok_count, fail_count statsd_client = statsd.StatsClient(host='localhost', port=8125) if __name__ == "__main__": index_source_to_target(ES_SOURCE, ES_TARGET, QUERY_MATCH_ALL, INDEX_SOURCE, INDEX_TARGET, BULK_SIZE, statsd_client, transform_item) 

Conclusion

Migrating data in a live production system is a complicated task that requires a lot of attention and careful planning. I recommend taking the time to work through the steps listed above and figure out what works best for your needs.

Nyrkkisääntönä yritä aina vähentää vaatimuksiasi mahdollisimman paljon. Tarvitaanko esimerkiksi seisokkien siirtyminen nollaan? Onko sinulla varaa tietojen menetykseen?

Tietovarastojen päivittäminen on yleensä maraton eikä sprintti, joten hengitä syvään ja yritä nauttia ratsastuksesta.

  • Koko yllä mainittu prosessi kesti noin 4 kuukautta työtä
  • Kaikki tässä viestissä näkyvät Elasticsearch-esimerkit on testattu versioon 6.8.1 nähden