End-to-end RAG using Elasticsearch and Cohere

End-to-end RAG using Elasticsearch and Cohere

Learn how to use the Inference API for semantic search and use Cohere's APIs for RAG.

For this example, you will need:

Note: While this tutorial integrates Cohere with an Elastic Cloud serverless project, you can also integrate with your self-managed Elasticsearch deployment or Elastic Cloud deployment by simply switching from using a Serverless endpoint in the Elasticsearch client.

If you don't have an Elastic Cloud deployment, sign up here for a free trial and request access to Elastic Serverless

To get started, we'll need to connect to our Elastic Serverless deployment using the Python client.

First we need to pip install the following packages:

  • elasticsearch_serverless
  • cohere

After installing, in the Serverless dashboard, find your endpoint URL, and create your API key.

pip install elasticsearch_serverless cohere

Next, we need to import the modules we need. 🔐 NOTE: getpass enables us to securely prompt the user for credentials without echoing them to the terminal, or storing it in memory.

from elasticsearch_serverless import Elasticsearch, helpers
from getpass import getpass
import cohere
import json
import requests

Now we can instantiate the Python Elasticsearch client.

First we prompt the user for their endpoint and encoded API key.
Then we create a client object that instantiates an instance of the Elasticsearch class.

When creating your Elastic Serverless API key make sure to turn on Control security privileges, and edit cluster privileges to specify "cluster": ["all"]

ELASTICSEARCH_ENDPOINT = getpass("Elastic Endpoint: ")
ELASTIC_API_KEY = getpass("Elastic encoded API key: ") # Use the encoded API key

client = Elasticsearch(
  ELASTICSEARCH_ENDPOINT,
  api_key=ELASTIC_API_KEY
)

Confirm that the client has connected with this test:

print(client.info())

Create the inference endpoint

Let's create the inference endpoint by using the Create inference API.

You'll need an Cohere API key for this that you can find in your Cohere account under the API keys section. A production key is required to complete the steps in this notebook as the Cohere free trial API usage is limited.

COHERE_API_KEY = getpass("Enter Cohere API key:  ")

client.options(ignore_status=[404]).inference.delete_model(inference_id="cohere_embeddings")

client.inference.put_model(
    task_type="text_embedding",
    inference_id="cohere_embeddings",
    body={
        "service": "cohere",
        "service_settings": {
            "api_key": COHERE_API_KEY,
            "model_id": "embed-english-v3.0",
            "embedding_type": "int8",
            "similarity": "cosine"
        },
        "task_settings": {},
    },
)

Create an ingest pipeline with an inference processor

Create an ingest pipeline with an inference processor by using the put_pipeline method. Reference the inference endpoint created above as the model_id to infer against the data that is being ingested in the pipeline.

client.options(ignore_status=[404]).ingest.delete_pipeline(id="cohere_embeddings")

client.ingest.put_pipeline(
    id="cohere_embeddings",
    description="Ingest pipeline for Cohere inference.",
    processors=[
        {
            "inference": {
                "model_id": "cohere_embeddings",
                "input_output": {
                    "input_field": "text",
                    "output_field": "text_embedding",
                },
            }
        }
    ],
)

Let's note a few important parameters from that API call:

  • inference: A processor that performs inference using a machine learning model.
  • model_id: Specifies the ID of the inference endpoint to be used. In this example, the model ID is set to cohere_embeddings.
  • input_output: Specifies input and output fields.
  • input_field: Field name from which the dense_vector representation is created.
  • output_field: Field name which contains inference results.

Create index

The mapping of the destination index – the index that contains the embeddings that the model will create based on your input text – must be created. The destination index must have a field with the dense_vector field type to index the output of the Cohere model.

Let's create an index named cohere-wiki-embeddings with the mappings we need.

client.indices.delete(index="cohere-wiki-embeddings", ignore_unavailable=True)
client.indices.create(
    index="cohere-wiki-embeddings",
    settings={"index": {"default_pipeline": "cohere_embeddings"}},
    mappings={
        "properties": {
            "text_embedding": {
                "type": "dense_vector",
                "dims": 1024,
                "element_type": "byte"
            },
            "text": {"type": "text"},
            "wiki_id": {"type": "integer"},
            "url": {"type": "text"},
            "views": {"type": "float"},
            "langs": {"type": "integer"},
            "title": {"type": "text"},
            "paragraph_id": {"type": "integer"},
            "id": {"type": "integer"}
        }
    },
)

Insert Documents

Let's insert our example wiki dataset. You need a production Cohere account to complete this step, otherwise the documentation ingest will time out due to the API request rate limits.

url = "https://raw.githubusercontent.com/cohere-ai/notebooks/main/notebooks/data/embed_jobs_sample_data.jsonl"
response = requests.get(url)

jsonl_data = response.content.decode('utf-8').splitlines()

documents = []
for line in jsonl_data:
    data_dict = json.loads(line)
    documents.append({
        "_index": "cohere-wiki-embeddings",
        "_source": data_dict,
        }
      )

helpers.bulk(client, documents)

print("Done indexing documents into `cohere-wiki-embeddings` index!")

Hybrid search

After the dataset has been enriched with the embeddings, you can query the data using hybrid search.

Pass a query_vector_builder to the k-nearest neighbor (kNN) vector search API, and provide the query text and the model you have used to create the embeddings.

query = "When were the semi-finals of the 2022 FIFA world cup played?"

response = client.search(
    index="cohere-wiki-embeddings",
    size=100,
    knn={
        "field": "text_embedding",
        "query_vector_builder": {
            "text_embedding": {
                "model_id": "cohere_embeddings",
                "model_text": query,
            }
        },
        "k": 10,
        "num_candidates": 50,
    },
    query={
      "multi_match": {
          "query": query,
          "fields": ["text", "title"]
        }
      }
)

raw_documents = response["hits"]["hits"]

for document in raw_documents[0:10]:
  print(f'Title: {document["_source"]["title"]}\nText: {document["_source"]["text"]}\n')

documents = []
for hit in response["hits"]["hits"]:
    documents.append(hit["_source"]["text"])

Ranking

In order to effectively combine the results from our vector and BM25 retrieval, we can use Cohere's Rerank 3 model through the inference API to provide a final, more precise, semantic reranking of our results.

First, create an inference endpoint with your Cohere API key. Make sure to specify a name for your endpoint, and the model_id of one of the rerank models. In this example we will use Rerank 3.

client.options(ignore_status=[404]).inference.delete_model(inference_id="cohere_rerank")

client.inference.put_model(
    task_type="rerank",
    inference_id="cohere_rerank",
    body={
        "service": "cohere",
        "service_settings":{
            "api_key": COHERE_API_KEY,
            "model_id": "rerank-english-v3.0"
           },
        "task_settings": {
            "top_n": 10,
        },
    }
)

You can now rerank your results using that inference endpoint. Here we will pass in the query we used for retrieval, along with the documents we just retrieved using hybrid search.

The inference service will respond with a list of documents in descending order of relevance. Each document has a corresponding index (reflecting to the order the documents were in when sent to the inference endpoint), and if the “return_documents” task setting is True, then the document texts will be included as well.

In this case we will set the response to False and will reconstruct the input documents based on the index returned in the response.

response = client.inference.inference(
    inference_id="cohere_rerank",
    body={
        "query": query,
        "input": documents,
        "task_settings": {
            "return_documents": False
            }
        }
)

ranked_documents = []
for document in response.body["rerank"]:
  ranked_documents.append({
      "title": raw_documents[int(document["index"])]["_source"]["title"],
      "text": raw_documents[int(document["index"])]["_source"]["text"]
  })

for document in ranked_documents[0:10]:
  print(f"Title: {document['title']}\nText: {document['text']}\n")

Now that we have ranked our results, we can easily turn this into a RAG system with Cohere's Chat API. Pass in the retrieved documents, along with the query and see the grounded response using Cohere's newest generative model Command R+.

First, we will create the Cohere client.

co = cohere.Client(COHERE_API_KEY)

Next, we can easily get a grounded generation with citations from the Cohere Chat API. We simply pass in the user query and documents retrieved from Elastic to the API, and print out our grounded response.

response = co.chat(
    message=query,
    documents=ranked_documents,
    model='command-r-plus'
)

source_documents = []
for citation in response.citations:
  for document_id in citation.document_ids:
    if document_id not in source_documents:
      source_documents.append(document_id)

print(f"Query: {query}")
print(f"Response: {response.text}")
print("Sources:")
for document in response.documents:
  if document['id'] in source_documents:
    print(f"{document['title']}: {document['text']}")

And there you have it! A quick and easy implementation of hybrid search and RAG with Cohere and Elastic.