End-to-end RAG using Elasticsearch and Cohere

Learn how to use the Inference API for semantic search and use Cohere’s APIs for RAG.

For this example, you will need:

Note: While this tutorial integrates Cohere with an Elastic Cloud serverless project, you can also integrate with your self-managed Elasticsearch deployment or Elastic Cloud deployment by simply switching from using a Serverless endpoint in the Elasticsearch client.

If you don’t have an Elastic Cloud deployment, sign up here for a free trial and request access to Elastic Serverless

To get started, we’ll need to connect to our Elastic Serverless deployment using the Python client.

First we need to pip install the following packages:

  • elasticsearch_serverless
  • cohere

After installing, in the Serverless dashboard, find your endpoint URL, and create your API key.

PYTHON
1pip install elasticsearch_serverless cohere

Next, we need to import the modules we need. 🔐 NOTE: getpass enables us to securely prompt the user for credentials without echoing them to the terminal, or storing it in memory.

PYTHON
1from elasticsearch_serverless import Elasticsearch, helpers
2from getpass import getpass
3import cohere
4import json
5import requests

Now we can instantiate the Python Elasticsearch client.

First we prompt the user for their endpoint and encoded API key. Then we create a client object that instantiates an instance of the Elasticsearch class.

When creating your Elastic Serverless API key make sure to turn on Control security privileges, and edit cluster privileges to specify "cluster": ["all"]

PYTHON
1ELASTICSEARCH_ENDPOINT = getpass("Elastic Endpoint: ")
2ELASTIC_API_KEY = getpass("Elastic encoded API key: ") # Use the encoded API key
3
4client = Elasticsearch(
5 ELASTICSEARCH_ENDPOINT,
6 api_key=ELASTIC_API_KEY
7)

Confirm that the client has connected with this test:

PYTHON
1print(client.info())

Create the inference endpoint

Let’s create the inference endpoint by using the Create inference API.

You’ll need an Cohere API key for this that you can find in your Cohere account under the API keys section. A production key is required to complete the steps in this notebook as the Cohere free trial API usage is limited.

PYTHON
1COHERE_API_KEY = getpass("Enter Cohere API key: ")
2
3client.options(ignore_status=[404]).inference.delete_model(inference_id="cohere_embeddings")
4
5client.inference.put_model(
6 task_type="text_embedding",
7 inference_id="cohere_embeddings",
8 body={
9 "service": "cohere",
10 "service_settings": {
11 "api_key": COHERE_API_KEY,
12 "model_id": "embed-english-v3.0",
13 "embedding_type": "int8",
14 "similarity": "cosine"
15 },
16 "task_settings": {},
17 },
18)

Create an ingest pipeline with an inference processor

Create an ingest pipeline with an inference processor by using the put_pipeline method. Reference the inference endpoint created above as the model_id to infer against the data that is being ingested in the pipeline.

PYTHON
1client.options(ignore_status=[404]).ingest.delete_pipeline(id="cohere_embeddings")
2
3client.ingest.put_pipeline(
4 id="cohere_embeddings",
5 description="Ingest pipeline for Cohere inference.",
6 processors=[
7 {
8 "inference": {
9 "model_id": "cohere_embeddings",
10 "input_output": {
11 "input_field": "text",
12 "output_field": "text_embedding",
13 },
14 }
15 }
16 ],
17)

Let’s note a few important parameters from that API call:

  • inference: A processor that performs inference using a machine learning model.
  • model_id: Specifies the ID of the inference endpoint to be used. In this example, the model ID is set to cohere_embeddings.
  • input_output: Specifies input and output fields.
  • input_field: Field name from which the dense_vector representation is created.
  • output_field: Field name which contains inference results.

Create index

The mapping of the destination index – the index that contains the embeddings that the model will create based on your input text – must be created. The destination index must have a field with the dense_vector field type to index the output of the Cohere model.

Let’s create an index named cohere-wiki-embeddings with the mappings we need.

PYTHON
1client.indices.delete(index="cohere-wiki-embeddings", ignore_unavailable=True)
2client.indices.create(
3 index="cohere-wiki-embeddings",
4 settings={"index": {"default_pipeline": "cohere_embeddings"}},
5 mappings={
6 "properties": {
7 "text_embedding": {
8 "type": "dense_vector",
9 "dims": 1024,
10 "element_type": "byte"
11 },
12 "text": {"type": "text"},
13 "wiki_id": {"type": "integer"},
14 "url": {"type": "text"},
15 "views": {"type": "float"},
16 "langs": {"type": "integer"},
17 "title": {"type": "text"},
18 "paragraph_id": {"type": "integer"},
19 "id": {"type": "integer"}
20 }
21 },
22)

Insert Documents

Let’s insert our example wiki dataset. You need a production Cohere account to complete this step, otherwise the documentation ingest will time out due to the API request rate limits.

PYTHON
1url = "https://raw.githubusercontent.com/cohere-ai/notebooks/main/notebooks/data/embed_jobs_sample_data.jsonl"
2response = requests.get(url)
3
4jsonl_data = response.content.decode('utf-8').splitlines()
5
6documents = []
7for line in jsonl_data:
8 data_dict = json.loads(line)
9 documents.append({
10 "_index": "cohere-wiki-embeddings",
11 "_source": data_dict,
12 }
13 )
14
15helpers.bulk(client, documents)
16
17print("Done indexing documents into `cohere-wiki-embeddings` index!")

After the dataset has been enriched with the embeddings, you can query the data using hybrid search.

Pass a query_vector_builder to the k-nearest neighbor (kNN) vector search API, and provide the query text and the model you have used to create the embeddings.

PYTHON
1query = "When were the semi-finals of the 2022 FIFA world cup played?"
2
3response = client.search(
4 index="cohere-wiki-embeddings",
5 size=100,
6 knn={
7 "field": "text_embedding",
8 "query_vector_builder": {
9 "text_embedding": {
10 "model_id": "cohere_embeddings",
11 "model_text": query,
12 }
13 },
14 "k": 10,
15 "num_candidates": 50,
16 },
17 query={
18 "multi_match": {
19 "query": query,
20 "fields": ["text", "title"]
21 }
22 }
23)
24
25raw_documents = response["hits"]["hits"]
26
27for document in raw_documents[0:10]:
28 print(f'Title: {document["_source"]["title"]}\nText: {document["_source"]["text"]}\n')
29
30documents = []
31for hit in response["hits"]["hits"]:
32 documents.append(hit["_source"]["text"])

Ranking

In order to effectively combine the results from our vector and BM25 retrieval, we can use Cohere’s Rerank 3 model through the inference API to provide a final, more precise, semantic reranking of our results.

First, create an inference endpoint with your Cohere API key. Make sure to specify a name for your endpoint, and the model_id of one of the rerank models. In this example we will use Rerank 3.

PYTHON
1client.options(ignore_status=[404]).inference.delete_model(inference_id="cohere_rerank")
2
3client.inference.put_model(
4 task_type="rerank",
5 inference_id="cohere_rerank",
6 body={
7 "service": "cohere",
8 "service_settings":{
9 "api_key": COHERE_API_KEY,
10 "model_id": "rerank-english-v3.0"
11 },
12 "task_settings": {
13 "top_n": 10,
14 },
15 }
16)

You can now rerank your results using that inference endpoint. Here we will pass in the query we used for retrieval, along with the documents we just retrieved using hybrid search.

The inference service will respond with a list of documents in descending order of relevance. Each document has a corresponding index (reflecting to the order the documents were in when sent to the inference endpoint), and if the “return_documents” task setting is True, then the document texts will be included as well.

In this case we will set the response to False and will reconstruct the input documents based on the index returned in the response.

PYTHON
1response = client.inference.inference(
2 inference_id="cohere_rerank",
3 body={
4 "query": query,
5 "input": documents,
6 "task_settings": {
7 "return_documents": False
8 }
9 }
10)
11
12ranked_documents = []
13for document in response.body["rerank"]:
14 ranked_documents.append({
15 "title": raw_documents[int(document["index"])]["_source"]["title"],
16 "text": raw_documents[int(document["index"])]["_source"]["text"]
17 })
18
19for document in ranked_documents[0:10]:
20 print(f"Title: {document['title']}\nText: {document['text']}\n")

Now that we have ranked our results, we can easily turn this into a RAG system with Cohere’s Chat API. Pass in the retrieved documents, along with the query and see the grounded response using Cohere’s newest generative model Command R+.

First, we will create the Cohere client.

PYTHON
1co = cohere.Client(COHERE_API_KEY)

Next, we can easily get a grounded generation with citations from the Cohere Chat API. We simply pass in the user query and documents retrieved from Elastic to the API, and print out our grounded response.

PYTHON
1response = co.chat(
2 message=query,
3 documents=ranked_documents,
4 model='command-r-plus'
5)
6
7source_documents = []
8for citation in response.citations:
9 for document_id in citation.document_ids:
10 if document_id not in source_documents:
11 source_documents.append(document_id)
12
13print(f"Query: {query}")
14print(f"Response: {response.text}")
15print("Sources:")
16for document in response.documents:
17 if document['id'] in source_documents:
18 print(f"{document['title']}: {document['text']}")

And there you have it! A quick and easy implementation of hybrid search and RAG with Cohere and Elastic.