Elasticsearch and Cohere

Integrate Cohere with Elasticsearch

Compute embedding with Cohere and store them for efficient vector or hybrid search in Elasticsearch.
This guide uses a dataset of Wikipedia articles to set up a pipeline for semantic search. It will cover:

  • Creating an Elastic inference processor using Cohere embeddings
  • Creating an Elasticsearch index with embeddings
  • Performing hybrid search on the Elasticsearch index and reranking results
  • Performing basic RAG

To see the full code sample, refer to this notebook.

Prerequisites

This tutorial assumes you have the following:

  • An Elastic Cloud account through Elastic Cloud, available with a free trial
  • A Cohere production API Key - if not, get your API Key at this link
  • Python 3.7 or higher

Note: While this tutorial integrates Cohere with an Elastic Cloud serverless project, you can also integrate with your self-managed Elasticsearch deployment or Elastic Cloud deployment by simply switching from the serverless to the general language client.

Install the required packages

Install and import the required Python Packages:

  • elasticsearch_serverless
  • cohere: ensure you are on version 5.2.5 or later

To install the packages, use the following code

!pip install elasticsearch_serverless==0.2.0.20231031
!pip install cohere==5.2.5

Import the required packages

from elasticsearch_serverless import Elasticsearch, helpers
import cohere
import json
import requests

Create an Elasticsearch client

In order to create an Elasticsearch client you will need:

  • An endpoint for your cluster, found in the Elastic Serverless dashboard
  • An encoded API key

When creating your API key in the Serverless dashboard make sure to turn on Control security privileges, and edit cluster privileges to specify "cluster": ["all"].
Note - you can also create a client using a local or Elastic Cloud cluster. For simplicity we use Elastic Serverless.

ELASTICSEARCH_ENDPOINT = "elastic_endpoint"
ELASTIC_API_KEY = "encoded_api_key"

client = Elasticsearch(
  ELASTICSEARCH_ENDPOINT,
  api_key=ELASTIC_API_KEY
)

# Confirm the client has connected
print(client.info())

Build a Hybrid Search Index with Cohere and Elasticsearch

Create an inference endpoint

One of the biggest pain points of building a vector search index is computing embeddings for a large corpus of data. Fortunately Elastic offers inference endpoints that can be used in ingest pipelines to automatically compute embeddings when bulk indexing operations are performed.

To set up an inference pipeline for ingestion we first must create an inference endpoint that uses Cohere embeddings. You'll need a Cohere API key for this that you can find in your Cohere account under the API keys section.

We will create an inference endpoint that uses embed-english-v3.0 and int8 or byte compression to save on storage.

COHERE_API_KEY = "cohere_api_key"

client.inference.put_model(
    task_type="text_embedding",
    inference_id="cohere_embeddings",
    body={
        "service": "cohere",
        "service_settings": {
            "api_key": COHERE_API_KEY,
            "model_id": "embed-english-v3.0",
            "embedding_type": "int8",
            "similarity": "cosine"
        },
        "task_settings": {},
    },
)

Create an inference pipeline

Now that we have an inference endpoint we can create an inference pipeline and processor to use when we ingest documents into our index.

client.ingest.put_pipeline(
    id="cohere_embeddings",
    description="Ingest pipeline for Cohere inference.",
    processors=[
        {
            "inference": {
                "model_id": "cohere_embeddings",
                "input_output": {
                    "input_field": "text",
                    "output_field": "text_embedding",
                },
            }
        }
    ],
)

Let's note a few important parameters from that API call:

  • inference: A processor that performs inference using a machine learning model or service such as Cohere.
  • model_id: Specifies the ID of the inference endpoint to be used. In this example, the model ID is set to cohere_embeddings to match the inference endpoint we created.
  • input_output: Specifies input and output fields.
  • input_field: Field name from which the dense_vector representation is created. This needs to match the data we are passing to the processor.
  • output_field: Field name which contains inference results.

Create index

We will now create an empty index that will be the destination of our documents and embeddings.

client.indices.create(
    index="cohere-wiki-embeddings",
    settings={"index": {"default_pipeline": "cohere_embeddings"}},
    mappings={
        "properties": {
            "text_embedding": {
                "type": "dense_vector",
                "dims": 1024,
                "element_type": "byte",
            },
            "text": {"type": "text"},
            "wiki_id": {"type": "integer"},
            "url": {"type": "text"},
            "views": {"type": "float"},
            "langs": {"type": "integer"},
            "title": {"type": "text"},
            "paragraph_id": {"type": "integer"},
            "id": {"type": "integer"},
        }
    },
)

Insert documents

Let’s now index our wiki dataset.

url = "https://raw.githubusercontent.com/cohere-ai/notebooks/main/notebooks/data/embed_jobs_sample_data.jsonl"
response = requests.get(url)

# Load the response data into a JSON object
jsonl_data = response.content.decode('utf-8').splitlines()

# Prepare the documents to be indexed
documents = []
for line in jsonl_data:
    data_dict = json.loads(line)
    documents.append({
        "_index": "cohere-wiki-embeddings",
        "_source": data_dict,
        }
      )

# Use the bulk endpoint to index
helpers.bulk(client, documents)

print("Done indexing documents into `cohere-wiki-embeddings` index!")

Our index should now be populated with our wiki data and text embeddings for the text field. Ingesting large datasets and creating vector or hybrid search indices is seamless with Elastic.

Hybrid Search with Elasticsearch and Cohere

Now let’s start querying our index. We will perform a hybrid search query, which means we will compute the relevance of search results based on the vector similarity to our query, as well as the keyword similarity. Hybrid search tends to lead to state-of-the-art search results and Elastic is well-suited to offer this.
Here we build a query that will search over the title and text fields using keyword matching, and will search over our text embeddings using vector similarity.

query = "When were the semi-finals of the 2022 FIFA world cup played?"

response = client.search(
    index="cohere-wiki-embeddings",
    size=100,
    knn={
        "field": "text_embedding",
        "query_vector_builder": {
            "text_embedding": {
                "model_id": "cohere_embeddings",
                "model_text": query,
            }
        },
        "k": 10,
        "num_candidates": 50,
    },
    query={
        "multi_match": {
            "query": query,
            "fields": ["text", "title"]
        }
    }
)

raw_documents = response[“hits”][“hits”]

# Display the first 10 results
for document in raw_documents[0:10]:
  print(f'Title: {document["_source"]["title"]}\nText: {document["_source"]["text"]}\n')

# Format the documents for ranking
documents = []
for hit in response["hits"]["hits"]:
    documents.append(hit["_source"]["text"])

These are looking pretty good, but we can better consolidate our results using Cohere’s new Rerank v3 model available through Elastic’s inference API.

Rerank search results with Cohere and Elasticsearch

In order to effectively combine the results from our vector and BM25 retrieval, we can use Cohere's Rerank v3 model through the inference API to provide a final, more precise, semantic reranking of our results.

First, create an inference endpoint with your Cohere API key. Make sure to specify a name for your endpoint, and the model_id of one of the rerank models. In this example we will use Rerank v3.

client.inference.put_model(
    task_type="rerank",
    inference_id="cohere_rerank",
    body={
        "service": "cohere",
        "service_settings":{
            "api_key": COHERE_API_KEY,
            "model_id": "rerank-english-v3.0"
           },
        "task_settings": {
            "top_n": 10,
        },
    }
)

You can now rerank your results using that inference endpoint. Here we will pass in the query we used for retrieval, along with the documents we just retrieved using hybrid search.

The inference service will respond with a list of documents in descending order of relevance. Each document has a corresponding index (reflecting the order the documents were in when sent to the inference endpoint), and if the “return_documents” task setting is True, then the document texts will be included as well.

response = client.inference.inference(
    inference_id="cohere_rerank",
    body={
        "query": query,
        "input": documents,
        "task_settings": {
            "return_documents": False
            }
        }
)

# Reconstruct the input documents based on the index provided in the rereank response
ranked_documents = []
for document in response.body["rerank"]:
  ranked_documents.append({
      "title": raw_documents[int(document["index"])]["_source"]["title"],
      "text": raw_documents[int(document["index"])]["_source"]["text"]
  })

# Print the top 10 results
for document in ranked_documents[0:10]:
  print(f"Title: {document['title']}\nText: {document['text']}\n")

RAG with Cohere and Elasticsearch

Now that we have ranked our results, we can easily turn this into a RAG system with Cohere's Chat API. Pass in the retrieved documents, along with the query and see the grounded response using Cohere's newest generative model Command R+.
Next, we can easily get a grounded generation with citations from the Cohere Chat API. We simply pass in the user query and documents retrieved from Elasticsearch to the API, and print out our grounded response.

response = co.chat(message=query, documents=ranked_documents, model='command-r-plus')

source_documents = []
for citation in response.citations:
    for document_id in citation.document_ids:
        if document_id not in source_documents:
            source_documents.append(document_id)

print(f"Query: {query}")
print(f"Response: {response.text}")
print("Sources:")
for document in response.documents:
    if document['id'] in source_documents:
        print(f"{document['title']}: {document['text']}")

And our response should look something like this.

Query: When were the semi-finals of the 2022 FIFA world cup played?
Response: The semi-finals of the 2022 FIFA World Cup were played on 13 and 14 December.
Sources:
2022 FIFA World Cup: The semi-finals were played on 13 and 14 December. Messi scored a penalty...

And there you have it! A quick and easy implementation of hybrid search and RAG with Cohere and Elastic.