Scrolled queries und Point-In-Time queries mit OpenSearch

Manchmal muss man viel Daten verarbeiten und man kann nicht alles auf einmal handhaben. OpenSearch bietet da Methoden an, um sich die Daten häppchenweise zu laden.

Einleitung

Man verwechselt manchmal OpenSearch mit einer Datenbank, die einem exakte Ergebnisse liefert. Dabei ist es letztendlich ein auf Userexperience optimiertes Suchindex-Framework, das hochverfügbar sein muss. Das begegnet einem an vielen Stellen wieder (Keine ACID Transaktionen, eigenwillige Sortierungen, keine relationalen Verknüpfungen…). Insbesondere die Anzahl Dokumente, die man in einem Index als Ergebnis bekommt, sind beschränkt. Ohne Angabe bekommt man nur 10 geliefert. Schraubt man die Größe des Ergebnisses höher, wird man trotzdem nicht mehr al 10.000 bekommen. Das kann man mit einer globalen Einstellung zwar ändern, aber das wird nicht empfohlen. Man darf nicht vergessen, dass ein Response ziemlich groß wird, den man nicht ohne weiteres sofort verarbeiten kann (das JSON muss ja erstmal geparst werden). Darüber hinaus müssen die OpenSearch Knoten diese großen Sets selbst verarbeiten können.

Aber manchmal muss man eben doch jedes einzelne Dokument durcharbeiten. In REST-Interfaces hat sich zum Beispiel das Paging etabliert, das seitenweise Ergebnisse liefert. Nachteil dieser Methode ist, dass man in der Zwischenzeit Daten hinzufügen kann. Das geht mit transaktionalen Datenbanken gut (außer, dass es schlecht skaliert - wir verschieben damit ggf. die Speicher-Allokation auf einen anderen Host), weil der Read-Snapshot stabil bleibt. OpenSearch ist nicht transaktional. Blättern wir durch die Ergebnisse, kann ein Hinzufügen dazu führen, dass sich Daten der ersten Seiten in das “Seitenfenster” der nachfolgenden Seiten schieben, während wir Seite für Seite auslesen. Dann würden Daten doppelt in dem Ergebnis auftauchen. Beim Löschen würden wir auf einmal Lücken haben (und das nicht mal bemerken).

OpenSearch verwendet deswegen etwas, was man als Snapshots bezeichnen könnte. Zwei APIs gibt es dazu: Scroll (Scroll query) und PIT (Point-In-Time mit search_after).

Beide sind im Prinzip ähnlich, aber es gibt verschiedene Empfehlungen, das eine oder das andere zu nehmen. Ganz grundsätzlich kann die Scroll API weiterhin verwendet werden, wenn man zum Beispiel wirklich alle Dokumente bearbeiten will (neu Indizieren, Verschieben, usw. usf) und vor allem nur eine Abfrage dafür braucht. Die PIT Methode ist flexibler und in der Speicherauslastung ggf. sparsamer und ist dann zu verwenden, wenn viele Anwender größere Ergebnissets ständig (und potenziell parallel) abfragen. PIT ist etwas umständlicher und hat ein paar Einschränkungen. Aber wenn man Frontends zur Datenanalyse schreibt, lohnt es sich, in Point-In-Time Abfragen einzuarbeiten.

Scroll

Scroll bedeutet, dass wir OpenSearch Bescheid geben, dass wir etwas Größeres vorhaben und wir durch diese große Ergebnismenge Stück für Stück durcharbeiten wollen. In Python wird das fast wie eine normale Abfrage gehandhabt:

    search_all={
        "size": 100,
        "query": {
            "match_all": {}
        }
    }
    resp=client.search(index=index_name, body=search_all, scroll='5s')

Mit dem obigen Aufruf suchen wir über einen Index, die query macht ein match_all: {} (also gib uns alles). Gleichzeitig geben wir aber an, nur 100 Dokumente haben zu wollen. Im search Aufruf ist der Parameter scroll='5s' wichtig.

Damit erhalten wir tatsächlich ein Ergebnis, dass 100 Dokumente erhalten kann (wenn mind. so viel in dem Index sind), dazu aber eine _scroll_id als neues Metadaten-Attribut. Intern legt OpenSearch ein Snapshot der Daten an und verknüpft es mit der ID. Dieses Schnapshot ist nun für diese eine Abfrage maximal fünf Sekunden gültig. D.h. wir müssen es jetzt schaffen, die 100 Dokumente in 5 Sekunden zu verarbeiten und können dann mit der Scroll-ID, weitere Seiten abfragen. Trödeln wir (zum Beispiel beim Debuggen), hagelt es Fehler, wenn die Zeit verstrichen ist.

Also machen wir mal was damit:

    scroll_id = resp['_scroll_id']
    while len(resp['hits']['hits']):
        for doc in resp['hits']['hits']:
            docs_listed+=1
            print (json.dumps(doc, indent=4, default=str))

Das verursacht eine Menge Zeichen in der Konsole und ist nicht wirklich sinnvoll, aber egal.

Nachdem wir die 100 Dokumente ausgegeben haben, müssen wir zur nächsten Seite:

    scroll_id = resp['_scroll_id']
    while len(resp['hits']['hits']):
        for doc in resp['hits']['hits']:
            docs_listed+=1
            print (json.dumps(doc, indent=4, default=str))

        # make a request using the Scroll API
        resp = client.scroll(
            scroll_id = scroll_id,
            scroll = '5s' # length of time to keep search context
        )
        scroll_id = resp['_scroll_id']

So lange len(resp['hits']['hits']) ein Ergebnis zeigt und wir nicht mehr als 5s für den Dump brauchen, werden wir bis zum bitteren Ende alle Dokumente auf die Konsole pusten.

Man sieht hier, dass der Aufruf von scroll nur die vorher ermittelte Scroll-ID braucht. D.h. der Snapshot der Daten ist exakt an diese eine Abfrage gebunden. Stellen wir nebenbei andere Abfragen an denselben Index, dann werden diese Abfragen die in der Zwischenzeit erfolgten Änderungen berücksichtigen. Aber unsere Scroll-Abfrage bleibt stur bei seiner Sichtweise vom Zeitpunkt des ersten Aufrufs der ersten Seite. Das ist eigentlich ähnlich wie in RDBMS mit Transaction-Scopes.

Abschließend sollte man die Scroill-ID löschen:

        client.clear_scroll(scroll_id=scroll_id)

Point-In-Time

Benötigt man mehr Flexibilität - also die Notwendigkeit, mehrere unterschiedliche Abfragen zu machen, die alle die gleiche Sicht der Daten liefert - benötigt man Point-In-Time. Das sind auch Snapshots (so wie mit der Scroll API), die aber nicht an eine bestimmte Abfrage gebunden sind.

PIT verwendet man zum Beispiel dann, wenn man erstmal über ein großes Set suchen will, dann davon Dokumente ändern oder löschen will, das wieder zu durchsuchen, um dann wieder was damit anzustellen, bis man mit allem fertig ist. Dann gibt man die PIT ID (und die dazugehörigen Ressourcen) wieder frei. Es wird schnell klar, dass das entweder für komplexe Datenoperationen oder gar für UIs notwendig ist, wo man dem Benutzer eine konsistente Sicht der Daten liefern will.

Allerdings, hat das Point-In-time Snapshot erstmal nichts damit zu tun, dass man jetzt auf einmal mehr als 10.000 Datensätze lesen kann. Das ist weiterhin an die globale Option gekoppelt und beschränkt. PIT erlaubt erstmal nur einen eigenen Scope festzulegen, an denen die Daten nicht durch Seiteneffekte verändert werden.

Aber OpenSearch unterstützt in einem PIT Scope den Parameter search_after in einer Suche. D.h. suche ich zunächst 1.000 Dokumente, werden meine nächsten Aufrufe mit search_after auch über die Grenze von 10.000 Dokumenten kommen. Aber ich erkaufe mir das mit etwas Aufwand. Search After ist eine komplexe Kombination aus einem Sortierkriterium und einem (sortierfähigen) eindeutigen Wert, nach dem ich weitersuchen will. Das können numerische IDs sein oder ein Zeitstempel mit Nanosekunden. Wichtig ist, dass der Wert für jedes Dokument eindeutig ist.

Ich würde gerne ein paar Beispiele dazu geben, aber in Python ist das zurzeit nicht möglich, da die opensearch-py Bibliothek nicht in diesem Teil der API mit OpenSearch 1.3.1 zusammenarbeitet. Da gibt es einen Bug. Wir müssen uns also erstmal auf die Scroll-API beschränken.