pinecone_datasets.dataset

  1import glob
  2import sys
  3import os
  4import itertools
  5import time
  6import json
  7import asyncio
  8import warnings
  9from urllib.parse import urlparse
 10from dataclasses import dataclass
 11from importlib.metadata import version
 12
 13import gcsfs
 14import s3fs
 15import pandas as pd
 16from tqdm.auto import tqdm
 17import pyarrow.parquet as pq
 18from pydantic import ValidationError
 19from typing import Any, Generator, Iterator, List, Union, Dict, Optional, Tuple
 20
 21from pinecone_datasets import cfg
 22from pinecone_datasets.catalog import DatasetMetadata
 23from pinecone_datasets.fs import get_cloud_fs, LocalFileSystem
 24
 25import pinecone as pc
 26from pinecone import Index
 27
 28
 29class DatasetInitializationError(Exception):
 30    long_message = """
 31    This dataset was not initialized from path, but from memory, e.g. Dataset.from_pandas(...)
 32    Therefore this dataset cannot be reloaded from path, or use methods that require a path.
 33    If you want to reload a dataset from path, please use the `from_path` method and pass a valid path.
 34    """
 35
 36    def __init__(self, message=long_message):
 37        self.message = message
 38        super().__init__(self.message)
 39
 40
 41# TODO: import from Client
 42@dataclass
 43class UpsertResponse:
 44    upserted_count: int
 45
 46
 47def iter_pandas_dataframe_slices(
 48    df: pd.DataFrame, batch_size, return_indexes
 49) -> Generator[List[Dict[str, Any]], None, None]:
 50    for i in range(0, len(df), batch_size):
 51        if return_indexes:
 52            yield (i, df.iloc[i : i + batch_size].to_dict(orient="records"))
 53        else:
 54            yield df.iloc[i : i + batch_size].to_dict(orient="records")
 55
 56
 57def iter_pandas_dataframe_single(
 58    df: pd.DataFrame,
 59) -> Generator[Dict[str, Any], None, None]:
 60    for i in range(0, len(df), 1):
 61        yield df.iloc[i : i + 1].to_dict(orient="records")[0]
 62
 63
 64class Dataset(object):
 65    @classmethod
 66    def from_path(cls, dataset_path, **kwargs):
 67        """
 68        Create a Dataset object from local or cloud storage
 69        Args:
 70            dataset_path (str): a path to a local or cloud storage path containing a valid dataset.
 71
 72        Returns:
 73            Dataset: a Dataset object
 74        """
 75        return cls(dataset_path=dataset_path, **kwargs)
 76
 77    @classmethod
 78    def from_catalog(cls, dataset_id, catalog_base_path: str = "", **kwargs):
 79        """
 80        Load a dataset from Pinecone's Datasets catalog, or from your own endpoint.
 81
 82        Args:
 83            dataset_id (str): the id of the dataset to load within a catalog
 84            catalog_base_path (str): the catalog's base path. Defaults to DATASETS_CATALOG_BASEPATH environment variable.
 85                                     If neither are set, will use Pinecone's public catalog.
 86
 87        Returns:
 88            Dataset: a Dataset object
 89        """
 90        catalog_base_path = (
 91            catalog_base_path
 92            if catalog_base_path
 93            else os.environ.get("DATASETS_CATALOG_BASEPATH", cfg.Storage.endpoint)
 94        )
 95        dataset_path = os.path.join(catalog_base_path, f"{dataset_id}")
 96        return cls(dataset_path=dataset_path, **kwargs)
 97
 98    @classmethod
 99    def from_pandas(
100        cls,
101        documents: pd.DataFrame,
102        metadata: DatasetMetadata,
103        documents_column_mapping: Optional[Dict] = None,
104        queries: Optional[pd.DataFrame] = None,
105        queries_column_mapping: Optional[Dict] = None,
106        **kwargs,
107    ) -> "Dataset":
108        """
109        Create a Dataset object from a pandas DataFrame
110
111        Args:
112            documents (pd.DataFrame): a pandas DataFrame containing the documents
113            documents_column_mapping (Dict): a dictionary mapping the columns of the documents DataFrame to the Pinecone Datasets Schema
114            queries (pd.DataFrame): a pandas DataFrame containing the queries
115            queries_column_mapping (Dict): a dictionary mapping the columns of the queries DataFrame to the Pinecone Datasets Schema
116
117        Keyword Args:
118            kwargs (Dict): additional arguments to pass to the fsspec constructor
119
120        Returns:
121            Dataset: a Dataset object
122        """
123        clazz = cls(dataset_path=None, **kwargs)
124        clazz._documents = cls._read_pandas_dataframe(
125            documents, documents_column_mapping, cfg.Schema.Names.documents
126        )
127        clazz._queries = cls._read_pandas_dataframe(
128            queries, queries_column_mapping, cfg.Schema.Names.queries
129        )
130        clazz._metadata = metadata
131        return clazz
132
133    @staticmethod
134    def _read_pandas_dataframe(
135        df: pd.DataFrame,
136        column_mapping: Dict[str, str],
137        schema: List[Tuple[str, bool, Any]],
138    ) -> pd.DataFrame:
139        """
140        Reads a pandas DataFrame and validates it against a schema.
141
142        Args:
143            df (pd.DataFrame): the pandas DataFrame to read
144            column_mapping (Dict[str, str]): a dictionary mapping the columns of the DataFrame to the Pinecone Datasets Schema (col_name, pinecone_name)
145            schema (List[Tuple[str, bool]]): the schema to validate against (column_name, is_nullable)
146
147        Returns:
148            pd.DataFrame: the validated, renamed DataFrame
149        """
150        if df is None or df.empty:
151            return pd.DataFrame(columns=[column_name for column_name, _, _ in schema])
152        else:
153            if column_mapping is not None:
154                df.rename(columns=column_mapping, inplace=True)
155            for column_name, is_nullable, null_value in schema:
156                if column_name not in df.columns and not is_nullable:
157                    raise ValueError(
158                        f"error, file is not matching Pinecone Datasets Schmea: {column_name} not found"
159                    )
160                elif column_name not in df.columns and is_nullable:
161                    df[column_name] = null_value
162            return df[[column_name for column_name, _, _ in schema]]
163
164    def __init__(
165        self,
166        dataset_path: str,
167        **kwargs,
168    ) -> None:
169        """
170        Dataset class to load and query datasets from the Pinecone Datasets catalog.
171        See `from_path` and `from_dataset_id` for examples on how to load a dataset.
172
173        Examples:
174            ```python
175            from pinecone_datasets import Dataset
176            dataset = Dataset.from_dataset_id("dataset_name")
177            # or
178            dataset = Dataset.from_path("gs://my-bucket/my-dataset")
179
180            for doc in dataset.iter_documents(batch_size=100):
181                index.upsert(doc)
182            for query in dataset.iter_queries(batch_size):
183                results = index.search(query)
184                # do something with the results
185            # or
186            dataset.documents # returns a pandas/polars DataFrame
187            dataset.queries # returns a pandas/polars DataFrame
188            ```
189
190        """
191        self._config = cfg
192        if dataset_path is not None:
193            endpoint = urlparse(dataset_path)._replace(path="").geturl()
194            self._fs = get_cloud_fs(endpoint, **kwargs)
195            self._dataset_path = dataset_path
196            if not self._fs.exists(self._dataset_path):
197                raise FileNotFoundError(
198                    "Dataset does not exist. Please check the path or dataset_id"
199                )
200        else:
201            self._fs = None
202            self._dataset_path = None
203        self._documents = None
204        self._queries = None
205        self._metadata = None
206        self._pinecone_client = None
207
208    def _is_datatype_exists(self, data_type: str) -> bool:
209        if not self._fs:
210            raise DatasetInitializationError()
211        return self._fs.exists(os.path.join(self._dataset_path, data_type))
212
213    @staticmethod
214    def _convert_metadata_from_dict_to_json(metadata: Optional[dict]) -> str:
215        if pd.isna(metadata):
216            return None
217        if metadata and not isinstance(metadata, dict):
218            raise TypeError(
219                f"metadata must be a dict but its {type(metadata)} meta = {metadata}"
220            )
221        return json.dumps(metadata, ensure_ascii=False)
222
223    @staticmethod
224    def _convert_metadata_from_json_to_dict(metadata: Optional[str]) -> dict:
225        if metadata is None:
226            return None
227        if not isinstance(metadata, str):
228            if isinstance(metadata, dict):
229                return metadata
230            else:
231                raise TypeError("metadata must be a string or dict")
232        return json.loads(metadata)
233
234    def _safe_read_from_path(self, data_type: str) -> pd.DataFrame:
235        if not self._fs:
236            raise DatasetInitializationError()
237
238        read_path_str = os.path.join(self._dataset_path, data_type, "*.parquet")
239        read_path = self._fs.glob(read_path_str)
240        if self._is_datatype_exists(data_type):
241            dataset = pq.ParquetDataset(read_path, filesystem=self._fs)
242            dataset_schema_names = dataset.schema.names
243            columns_to_null = []
244            columns_not_null = []
245            for column_name, is_nullable, null_value in getattr(
246                self._config.Schema.Names, data_type
247            ):
248                if column_name not in dataset_schema_names and not is_nullable:
249                    raise ValueError(
250                        f"error, file is not matching Pinecone Datasets Schmea: {column_name} not found"
251                    )
252                elif column_name not in dataset_schema_names and is_nullable:
253                    columns_to_null.append((column_name, null_value))
254                else:
255                    columns_not_null.append(column_name)
256            try:
257                # TODO: use of the columns_not_null and columns_to_null is only a workaround for proper schema validation and versioning
258                df = dataset.read_pandas(columns=columns_not_null).to_pandas()
259
260                # metadta supposed to be a dict [if legacy] or string
261                if data_type == "documents":
262                    df["metadata"] = df["metadata"].apply(
263                        self._convert_metadata_from_json_to_dict
264                    )
265                elif data_type == "queries":
266                    df["filter"] = df["filter"].apply(
267                        self._convert_metadata_from_json_to_dict
268                    )
269
270                for column_name, null_value in columns_to_null:
271                    df[column_name] = null_value
272                return df
273            # TODO: add more specific error handling, explain what is wrong
274            except Exception as e:
275                print("error, no exception: {}".format(e), file=sys.stderr)
276                raise (e)
277        else:
278            warnings.warn(
279                "WARNING: No data found at: {}. Returning empty DF".format(
280                    read_path_str
281                ),
282                UserWarning,
283                stacklevel=0,
284            )
285            return pd.DataFrame(
286                columns=[
287                    col[0] for col in getattr(self._config.Schema.Names, data_type)
288                ]
289            )
290
291    def _load_metadata(self) -> DatasetMetadata:
292        if not self._fs:
293            raise DatasetInitializationError()
294
295        with self._fs.open(
296            os.path.join(self._dataset_path, "metadata.json"), "rb"
297        ) as f:
298            metadata = json.load(f)
299        try:
300            out = DatasetMetadata(**metadata)
301            return out
302        # TODO: add more specific error handling, explain what is wrong
303        except ValidationError as e:
304            raise e
305
306    def __getitem__(self, key: str):
307        if key in ["documents", "queries"]:
308            return getattr(self, key)
309        else:
310            raise KeyError("Dataset does not have key: {}".format(key))
311
312    def __len__(self) -> int:
313        return self.documents.shape[0]
314
315    @property
316    def documents(self) -> pd.DataFrame:
317        if self._documents is None:
318            self._documents = self._safe_read_from_path("documents")
319        return self._documents
320
321    def iter_documents(
322        self, batch_size: int = 1, return_indexes=False
323    ) -> Iterator[List[Dict[str, Any]]]:
324        """
325        Iterates over the documents in the dataset.
326
327        Args:
328            batch_size (int, optional): The batch size to use for the iterator. Defaults to 1.
329
330        Returns:
331            Iterator[List[Dict[str, Any]]]: An iterator over the documents in the dataset.
332
333        Examples:
334            for batch in dataset.iter_documents(batch_size=100):
335                index.upsert(batch)
336        """
337        if isinstance(batch_size, int) and batch_size > 0:
338            return iter_pandas_dataframe_slices(
339                df=self.documents[self._config.Schema.documents_select_columns].dropna(
340                    axis=1, how="all"
341                ),
342                batch_size=batch_size,
343                return_indexes=return_indexes,
344            )
345        else:
346            raise ValueError("batch_size must be greater than 0")
347
348    @property
349    def queries(self) -> pd.DataFrame:
350        if self._queries is None:
351            self._queries = self._safe_read_from_path("queries")
352        return self._queries
353
354    def iter_queries(self) -> Iterator[Dict[str, Any]]:
355        """
356        Iterates over the queries in the dataset.
357
358        Returns:
359            Iterator[Dict[str, Any]]: An iterator over the queries in the dataset.
360
361        Examples:
362            for query in dataset.iter_queries():
363                results = index.query(**query)
364                # do something with the results
365        """
366        return iter_pandas_dataframe_single(
367            self.queries[self._config.Schema.queries_select_columns]
368        )
369
370    @property
371    def metadata(self) -> DatasetMetadata:
372        if self._metadata is None:
373            self._metadata = self._load_metadata()
374        return self._metadata
375
376    def head(self, n: int = 5) -> pd.DataFrame:
377        return self.documents.head(n)
378
379    def to_path(self, dataset_path: str, **kwargs):
380        """
381        Saves the dataset to a local or cloud storage path.
382        """
383        fs = get_cloud_fs(dataset_path, **kwargs)
384
385        # save documents
386        documents_path = os.path.join(dataset_path, "documents")
387        fs.makedirs(documents_path, exist_ok=True)
388
389        documents_metadta_copy = self.documents["metadata"].copy()
390        try:
391            self.documents["metadata"] = self.documents["metadata"].apply(
392                self._convert_metadata_from_dict_to_json
393            )
394            self.documents.to_parquet(
395                os.path.join(documents_path, "part-0.parquet"),
396                engine="pyarrow",
397                index=False,
398                filesystem=fs,
399            )
400        finally:
401            self.documents["metadata"] = documents_metadta_copy
402        # save queries
403        if not self.queries.empty:
404            queries_path = os.path.join(dataset_path, "queries")
405            fs.makedirs(queries_path, exist_ok=True)
406            queries_filter_copy = self.queries["filter"].copy()
407            try:
408                self.queries["filter"] = self.queries["filter"].apply(
409                    self._convert_metadata_from_dict_to_json
410                )
411                self.queries.to_parquet(
412                    os.path.join(queries_path, "part-0.parquet"),
413                    engine="pyarrow",
414                    index=False,
415                    filesystem=fs,
416                )
417            finally:
418                self.queries["filter"] = queries_filter_copy
419        else:
420            warnings.warn("Queries are empty, not saving queries")
421
422        # save metadata
423        with fs.open(os.path.join(dataset_path, "metadata.json"), "w") as f:
424            json.dump(self.metadata.dict(), f)
425
426    def to_catalog(
427        self,
428        dataset_id: str,
429        catalog_base_path: str = "",
430        **kwargs,
431    ):
432        """
433        Saves the dataset to the public catalog.
434        """
435
436        # TODO: duplicated code
437
438        catalog_base_path = (
439            catalog_base_path
440            if catalog_base_path
441            else os.environ.get("DATASETS_CATALOG_BASEPATH", cfg.Storage.endpoint)
442        )
443        dataset_path = os.path.join(catalog_base_path, f"{dataset_id}")
444        self.to_path(dataset_path, **kwargs)
445
446    def _upsert_to_index(
447        self, index_name: str, namespace: str, batch_size: int, show_progress: bool
448    ):
449        pinecone_index = Index(index_name=index_name)
450
451        res = pinecone_index.upsert_from_dataframe(
452            self.documents[self._config.Schema.documents_select_columns].dropna(
453                axis=1, how="all"
454            ),
455            namespace=namespace,
456            batch_size=batch_size,
457            show_progress=show_progress,
458        )
459        return {"upserted_count": res.upserted_count}
460
461    def _set_pinecone_index(
462        self,
463        api_key: Optional[str] = None,
464        environment: Optional[str] = None,
465        **kwargs,
466    ) -> None:
467        pc.init(api_key=api_key, environment=environment, **kwargs)
468        self._pinecone_client = pc
469
470    def _create_index(
471        self,
472        index_name: str,
473        api_key: Optional[str] = None,
474        environment: Optional[str] = None,
475        **kwargs,
476    ) -> Index:
477        self._set_pinecone_index(api_key=api_key, environment=environment)
478        pinecone_index_list = self._pinecone_client.list_indexes()
479
480        if index_name in pinecone_index_list:
481            raise ValueError(
482                f"index {index_name} already exists, Pinecone Datasets can only be upserted to a new indexe"
483            )
484        else:
485            # create index
486            print("creating index")
487            try:
488                self._pinecone_client.create_index(
489                    name=index_name,
490                    dimension=self.metadata.dense_model.dimension,
491                    **kwargs,
492                )
493                print("index created")
494                return True
495            except Exception as e:
496                print(f"error creating index: {e}")
497                return False
498
499    def to_pinecone_index(
500        self,
501        index_name: str,
502        namespace: Optional[str] = "",
503        should_create_index: bool = True,
504        batch_size: int = 100,
505        show_progress: bool = True,
506        api_key: Optional[str] = None,
507        environment: Optional[str] = None,
508        **kwargs,
509    ):
510        """
511        Saves the dataset to a Pinecone index.
512
513        this function will look for two environment variables:
514        - PINECONE_API_KEY
515        - PINECONE_ENVIRONMENT
516
517        Then, it will init a Pinecone Client and will perform an upsert to the index.
518        The upsert will be using async batches to increase performance.
519
520        Args:
521            index_name (str): the name of the index to upsert to
522            namespace (str, optional): the namespace to use for the upsert. Defaults to "".
523            batch_size (int, optional): the batch size to use for the upsert. Defaults to 100.
524            show_progress (bool, optional): whether to show a progress bar while upserting. Defaults to True.
525
526        Keyword Args:
527            kwargs (Dict): additional arguments to pass to the Pinecone Client constructor when creating the index.
528            see available parameters here: https://docs.pinecone.io/reference/create_index
529
530
531        Returns:
532            UpsertResponse: an object containing the upserted_count
533
534        Examples:
535            ```python
536            result = dataset.to_pinecone_index(index_name="my_index")
537            ```
538        """
539        if should_create_index:
540            if not self._create_index(
541                index_name, api_key=api_key, environment=environment, **kwargs
542            ):
543                raise RuntimeError("index creation failed")
544        else:
545            self._set_pinecone_index(api_key=api_key, environment=environment, **kwargs)
546
547        return self._upsert_to_index(
548            index_name=index_name,
549            namespace=namespace,
550            batch_size=batch_size,
551            show_progress=show_progress,
552        )
class DatasetInitializationError(builtins.Exception):
30class DatasetInitializationError(Exception):
31    long_message = """
32    This dataset was not initialized from path, but from memory, e.g. Dataset.from_pandas(...)
33    Therefore this dataset cannot be reloaded from path, or use methods that require a path.
34    If you want to reload a dataset from path, please use the `from_path` method and pass a valid path.
35    """
36
37    def __init__(self, message=long_message):
38        self.message = message
39        super().__init__(self.message)

Common base class for all non-exit exceptions.

DatasetInitializationError( message='\n This dataset was not initialized from path, but from memory, e.g. Dataset.from_pandas(...)\n Therefore this dataset cannot be reloaded from path, or use methods that require a path.\n If you want to reload a dataset from path, please use the `from_path` method and pass a valid path.\n ')
37    def __init__(self, message=long_message):
38        self.message = message
39        super().__init__(self.message)
Inherited Members
builtins.BaseException
with_traceback
@dataclass
class UpsertResponse:
43@dataclass
44class UpsertResponse:
45    upserted_count: int
UpsertResponse(upserted_count: int)
def iter_pandas_dataframe_slices( df: pandas.core.frame.DataFrame, batch_size, return_indexes) -> Generator[List[Dict[str, Any]], NoneType, NoneType]:
48def iter_pandas_dataframe_slices(
49    df: pd.DataFrame, batch_size, return_indexes
50) -> Generator[List[Dict[str, Any]], None, None]:
51    for i in range(0, len(df), batch_size):
52        if return_indexes:
53            yield (i, df.iloc[i : i + batch_size].to_dict(orient="records"))
54        else:
55            yield df.iloc[i : i + batch_size].to_dict(orient="records")
def iter_pandas_dataframe_single( df: pandas.core.frame.DataFrame) -> Generator[Dict[str, Any], NoneType, NoneType]:
58def iter_pandas_dataframe_single(
59    df: pd.DataFrame,
60) -> Generator[Dict[str, Any], None, None]:
61    for i in range(0, len(df), 1):
62        yield df.iloc[i : i + 1].to_dict(orient="records")[0]
class Dataset:
 65class Dataset(object):
 66    @classmethod
 67    def from_path(cls, dataset_path, **kwargs):
 68        """
 69        Create a Dataset object from local or cloud storage
 70        Args:
 71            dataset_path (str): a path to a local or cloud storage path containing a valid dataset.
 72
 73        Returns:
 74            Dataset: a Dataset object
 75        """
 76        return cls(dataset_path=dataset_path, **kwargs)
 77
 78    @classmethod
 79    def from_catalog(cls, dataset_id, catalog_base_path: str = "", **kwargs):
 80        """
 81        Load a dataset from Pinecone's Datasets catalog, or from your own endpoint.
 82
 83        Args:
 84            dataset_id (str): the id of the dataset to load within a catalog
 85            catalog_base_path (str): the catalog's base path. Defaults to DATASETS_CATALOG_BASEPATH environment variable.
 86                                     If neither are set, will use Pinecone's public catalog.
 87
 88        Returns:
 89            Dataset: a Dataset object
 90        """
 91        catalog_base_path = (
 92            catalog_base_path
 93            if catalog_base_path
 94            else os.environ.get("DATASETS_CATALOG_BASEPATH", cfg.Storage.endpoint)
 95        )
 96        dataset_path = os.path.join(catalog_base_path, f"{dataset_id}")
 97        return cls(dataset_path=dataset_path, **kwargs)
 98
 99    @classmethod
100    def from_pandas(
101        cls,
102        documents: pd.DataFrame,
103        metadata: DatasetMetadata,
104        documents_column_mapping: Optional[Dict] = None,
105        queries: Optional[pd.DataFrame] = None,
106        queries_column_mapping: Optional[Dict] = None,
107        **kwargs,
108    ) -> "Dataset":
109        """
110        Create a Dataset object from a pandas DataFrame
111
112        Args:
113            documents (pd.DataFrame): a pandas DataFrame containing the documents
114            documents_column_mapping (Dict): a dictionary mapping the columns of the documents DataFrame to the Pinecone Datasets Schema
115            queries (pd.DataFrame): a pandas DataFrame containing the queries
116            queries_column_mapping (Dict): a dictionary mapping the columns of the queries DataFrame to the Pinecone Datasets Schema
117
118        Keyword Args:
119            kwargs (Dict): additional arguments to pass to the fsspec constructor
120
121        Returns:
122            Dataset: a Dataset object
123        """
124        clazz = cls(dataset_path=None, **kwargs)
125        clazz._documents = cls._read_pandas_dataframe(
126            documents, documents_column_mapping, cfg.Schema.Names.documents
127        )
128        clazz._queries = cls._read_pandas_dataframe(
129            queries, queries_column_mapping, cfg.Schema.Names.queries
130        )
131        clazz._metadata = metadata
132        return clazz
133
134    @staticmethod
135    def _read_pandas_dataframe(
136        df: pd.DataFrame,
137        column_mapping: Dict[str, str],
138        schema: List[Tuple[str, bool, Any]],
139    ) -> pd.DataFrame:
140        """
141        Reads a pandas DataFrame and validates it against a schema.
142
143        Args:
144            df (pd.DataFrame): the pandas DataFrame to read
145            column_mapping (Dict[str, str]): a dictionary mapping the columns of the DataFrame to the Pinecone Datasets Schema (col_name, pinecone_name)
146            schema (List[Tuple[str, bool]]): the schema to validate against (column_name, is_nullable)
147
148        Returns:
149            pd.DataFrame: the validated, renamed DataFrame
150        """
151        if df is None or df.empty:
152            return pd.DataFrame(columns=[column_name for column_name, _, _ in schema])
153        else:
154            if column_mapping is not None:
155                df.rename(columns=column_mapping, inplace=True)
156            for column_name, is_nullable, null_value in schema:
157                if column_name not in df.columns and not is_nullable:
158                    raise ValueError(
159                        f"error, file is not matching Pinecone Datasets Schmea: {column_name} not found"
160                    )
161                elif column_name not in df.columns and is_nullable:
162                    df[column_name] = null_value
163            return df[[column_name for column_name, _, _ in schema]]
164
165    def __init__(
166        self,
167        dataset_path: str,
168        **kwargs,
169    ) -> None:
170        """
171        Dataset class to load and query datasets from the Pinecone Datasets catalog.
172        See `from_path` and `from_dataset_id` for examples on how to load a dataset.
173
174        Examples:
175            ```python
176            from pinecone_datasets import Dataset
177            dataset = Dataset.from_dataset_id("dataset_name")
178            # or
179            dataset = Dataset.from_path("gs://my-bucket/my-dataset")
180
181            for doc in dataset.iter_documents(batch_size=100):
182                index.upsert(doc)
183            for query in dataset.iter_queries(batch_size):
184                results = index.search(query)
185                # do something with the results
186            # or
187            dataset.documents # returns a pandas/polars DataFrame
188            dataset.queries # returns a pandas/polars DataFrame
189            ```
190
191        """
192        self._config = cfg
193        if dataset_path is not None:
194            endpoint = urlparse(dataset_path)._replace(path="").geturl()
195            self._fs = get_cloud_fs(endpoint, **kwargs)
196            self._dataset_path = dataset_path
197            if not self._fs.exists(self._dataset_path):
198                raise FileNotFoundError(
199                    "Dataset does not exist. Please check the path or dataset_id"
200                )
201        else:
202            self._fs = None
203            self._dataset_path = None
204        self._documents = None
205        self._queries = None
206        self._metadata = None
207        self._pinecone_client = None
208
209    def _is_datatype_exists(self, data_type: str) -> bool:
210        if not self._fs:
211            raise DatasetInitializationError()
212        return self._fs.exists(os.path.join(self._dataset_path, data_type))
213
214    @staticmethod
215    def _convert_metadata_from_dict_to_json(metadata: Optional[dict]) -> str:
216        if pd.isna(metadata):
217            return None
218        if metadata and not isinstance(metadata, dict):
219            raise TypeError(
220                f"metadata must be a dict but its {type(metadata)} meta = {metadata}"
221            )
222        return json.dumps(metadata, ensure_ascii=False)
223
224    @staticmethod
225    def _convert_metadata_from_json_to_dict(metadata: Optional[str]) -> dict:
226        if metadata is None:
227            return None
228        if not isinstance(metadata, str):
229            if isinstance(metadata, dict):
230                return metadata
231            else:
232                raise TypeError("metadata must be a string or dict")
233        return json.loads(metadata)
234
235    def _safe_read_from_path(self, data_type: str) -> pd.DataFrame:
236        if not self._fs:
237            raise DatasetInitializationError()
238
239        read_path_str = os.path.join(self._dataset_path, data_type, "*.parquet")
240        read_path = self._fs.glob(read_path_str)
241        if self._is_datatype_exists(data_type):
242            dataset = pq.ParquetDataset(read_path, filesystem=self._fs)
243            dataset_schema_names = dataset.schema.names
244            columns_to_null = []
245            columns_not_null = []
246            for column_name, is_nullable, null_value in getattr(
247                self._config.Schema.Names, data_type
248            ):
249                if column_name not in dataset_schema_names and not is_nullable:
250                    raise ValueError(
251                        f"error, file is not matching Pinecone Datasets Schmea: {column_name} not found"
252                    )
253                elif column_name not in dataset_schema_names and is_nullable:
254                    columns_to_null.append((column_name, null_value))
255                else:
256                    columns_not_null.append(column_name)
257            try:
258                # TODO: use of the columns_not_null and columns_to_null is only a workaround for proper schema validation and versioning
259                df = dataset.read_pandas(columns=columns_not_null).to_pandas()
260
261                # metadta supposed to be a dict [if legacy] or string
262                if data_type == "documents":
263                    df["metadata"] = df["metadata"].apply(
264                        self._convert_metadata_from_json_to_dict
265                    )
266                elif data_type == "queries":
267                    df["filter"] = df["filter"].apply(
268                        self._convert_metadata_from_json_to_dict
269                    )
270
271                for column_name, null_value in columns_to_null:
272                    df[column_name] = null_value
273                return df
274            # TODO: add more specific error handling, explain what is wrong
275            except Exception as e:
276                print("error, no exception: {}".format(e), file=sys.stderr)
277                raise (e)
278        else:
279            warnings.warn(
280                "WARNING: No data found at: {}. Returning empty DF".format(
281                    read_path_str
282                ),
283                UserWarning,
284                stacklevel=0,
285            )
286            return pd.DataFrame(
287                columns=[
288                    col[0] for col in getattr(self._config.Schema.Names, data_type)
289                ]
290            )
291
292    def _load_metadata(self) -> DatasetMetadata:
293        if not self._fs:
294            raise DatasetInitializationError()
295
296        with self._fs.open(
297            os.path.join(self._dataset_path, "metadata.json"), "rb"
298        ) as f:
299            metadata = json.load(f)
300        try:
301            out = DatasetMetadata(**metadata)
302            return out
303        # TODO: add more specific error handling, explain what is wrong
304        except ValidationError as e:
305            raise e
306
307    def __getitem__(self, key: str):
308        if key in ["documents", "queries"]:
309            return getattr(self, key)
310        else:
311            raise KeyError("Dataset does not have key: {}".format(key))
312
313    def __len__(self) -> int:
314        return self.documents.shape[0]
315
316    @property
317    def documents(self) -> pd.DataFrame:
318        if self._documents is None:
319            self._documents = self._safe_read_from_path("documents")
320        return self._documents
321
322    def iter_documents(
323        self, batch_size: int = 1, return_indexes=False
324    ) -> Iterator[List[Dict[str, Any]]]:
325        """
326        Iterates over the documents in the dataset.
327
328        Args:
329            batch_size (int, optional): The batch size to use for the iterator. Defaults to 1.
330
331        Returns:
332            Iterator[List[Dict[str, Any]]]: An iterator over the documents in the dataset.
333
334        Examples:
335            for batch in dataset.iter_documents(batch_size=100):
336                index.upsert(batch)
337        """
338        if isinstance(batch_size, int) and batch_size > 0:
339            return iter_pandas_dataframe_slices(
340                df=self.documents[self._config.Schema.documents_select_columns].dropna(
341                    axis=1, how="all"
342                ),
343                batch_size=batch_size,
344                return_indexes=return_indexes,
345            )
346        else:
347            raise ValueError("batch_size must be greater than 0")
348
349    @property
350    def queries(self) -> pd.DataFrame:
351        if self._queries is None:
352            self._queries = self._safe_read_from_path("queries")
353        return self._queries
354
355    def iter_queries(self) -> Iterator[Dict[str, Any]]:
356        """
357        Iterates over the queries in the dataset.
358
359        Returns:
360            Iterator[Dict[str, Any]]: An iterator over the queries in the dataset.
361
362        Examples:
363            for query in dataset.iter_queries():
364                results = index.query(**query)
365                # do something with the results
366        """
367        return iter_pandas_dataframe_single(
368            self.queries[self._config.Schema.queries_select_columns]
369        )
370
371    @property
372    def metadata(self) -> DatasetMetadata:
373        if self._metadata is None:
374            self._metadata = self._load_metadata()
375        return self._metadata
376
377    def head(self, n: int = 5) -> pd.DataFrame:
378        return self.documents.head(n)
379
380    def to_path(self, dataset_path: str, **kwargs):
381        """
382        Saves the dataset to a local or cloud storage path.
383        """
384        fs = get_cloud_fs(dataset_path, **kwargs)
385
386        # save documents
387        documents_path = os.path.join(dataset_path, "documents")
388        fs.makedirs(documents_path, exist_ok=True)
389
390        documents_metadta_copy = self.documents["metadata"].copy()
391        try:
392            self.documents["metadata"] = self.documents["metadata"].apply(
393                self._convert_metadata_from_dict_to_json
394            )
395            self.documents.to_parquet(
396                os.path.join(documents_path, "part-0.parquet"),
397                engine="pyarrow",
398                index=False,
399                filesystem=fs,
400            )
401        finally:
402            self.documents["metadata"] = documents_metadta_copy
403        # save queries
404        if not self.queries.empty:
405            queries_path = os.path.join(dataset_path, "queries")
406            fs.makedirs(queries_path, exist_ok=True)
407            queries_filter_copy = self.queries["filter"].copy()
408            try:
409                self.queries["filter"] = self.queries["filter"].apply(
410                    self._convert_metadata_from_dict_to_json
411                )
412                self.queries.to_parquet(
413                    os.path.join(queries_path, "part-0.parquet"),
414                    engine="pyarrow",
415                    index=False,
416                    filesystem=fs,
417                )
418            finally:
419                self.queries["filter"] = queries_filter_copy
420        else:
421            warnings.warn("Queries are empty, not saving queries")
422
423        # save metadata
424        with fs.open(os.path.join(dataset_path, "metadata.json"), "w") as f:
425            json.dump(self.metadata.dict(), f)
426
427    def to_catalog(
428        self,
429        dataset_id: str,
430        catalog_base_path: str = "",
431        **kwargs,
432    ):
433        """
434        Saves the dataset to the public catalog.
435        """
436
437        # TODO: duplicated code
438
439        catalog_base_path = (
440            catalog_base_path
441            if catalog_base_path
442            else os.environ.get("DATASETS_CATALOG_BASEPATH", cfg.Storage.endpoint)
443        )
444        dataset_path = os.path.join(catalog_base_path, f"{dataset_id}")
445        self.to_path(dataset_path, **kwargs)
446
447    def _upsert_to_index(
448        self, index_name: str, namespace: str, batch_size: int, show_progress: bool
449    ):
450        pinecone_index = Index(index_name=index_name)
451
452        res = pinecone_index.upsert_from_dataframe(
453            self.documents[self._config.Schema.documents_select_columns].dropna(
454                axis=1, how="all"
455            ),
456            namespace=namespace,
457            batch_size=batch_size,
458            show_progress=show_progress,
459        )
460        return {"upserted_count": res.upserted_count}
461
462    def _set_pinecone_index(
463        self,
464        api_key: Optional[str] = None,
465        environment: Optional[str] = None,
466        **kwargs,
467    ) -> None:
468        pc.init(api_key=api_key, environment=environment, **kwargs)
469        self._pinecone_client = pc
470
471    def _create_index(
472        self,
473        index_name: str,
474        api_key: Optional[str] = None,
475        environment: Optional[str] = None,
476        **kwargs,
477    ) -> Index:
478        self._set_pinecone_index(api_key=api_key, environment=environment)
479        pinecone_index_list = self._pinecone_client.list_indexes()
480
481        if index_name in pinecone_index_list:
482            raise ValueError(
483                f"index {index_name} already exists, Pinecone Datasets can only be upserted to a new indexe"
484            )
485        else:
486            # create index
487            print("creating index")
488            try:
489                self._pinecone_client.create_index(
490                    name=index_name,
491                    dimension=self.metadata.dense_model.dimension,
492                    **kwargs,
493                )
494                print("index created")
495                return True
496            except Exception as e:
497                print(f"error creating index: {e}")
498                return False
499
500    def to_pinecone_index(
501        self,
502        index_name: str,
503        namespace: Optional[str] = "",
504        should_create_index: bool = True,
505        batch_size: int = 100,
506        show_progress: bool = True,
507        api_key: Optional[str] = None,
508        environment: Optional[str] = None,
509        **kwargs,
510    ):
511        """
512        Saves the dataset to a Pinecone index.
513
514        this function will look for two environment variables:
515        - PINECONE_API_KEY
516        - PINECONE_ENVIRONMENT
517
518        Then, it will init a Pinecone Client and will perform an upsert to the index.
519        The upsert will be using async batches to increase performance.
520
521        Args:
522            index_name (str): the name of the index to upsert to
523            namespace (str, optional): the namespace to use for the upsert. Defaults to "".
524            batch_size (int, optional): the batch size to use for the upsert. Defaults to 100.
525            show_progress (bool, optional): whether to show a progress bar while upserting. Defaults to True.
526
527        Keyword Args:
528            kwargs (Dict): additional arguments to pass to the Pinecone Client constructor when creating the index.
529            see available parameters here: https://docs.pinecone.io/reference/create_index
530
531
532        Returns:
533            UpsertResponse: an object containing the upserted_count
534
535        Examples:
536            ```python
537            result = dataset.to_pinecone_index(index_name="my_index")
538            ```
539        """
540        if should_create_index:
541            if not self._create_index(
542                index_name, api_key=api_key, environment=environment, **kwargs
543            ):
544                raise RuntimeError("index creation failed")
545        else:
546            self._set_pinecone_index(api_key=api_key, environment=environment, **kwargs)
547
548        return self._upsert_to_index(
549            index_name=index_name,
550            namespace=namespace,
551            batch_size=batch_size,
552            show_progress=show_progress,
553        )
Dataset(dataset_path: str, **kwargs)
165    def __init__(
166        self,
167        dataset_path: str,
168        **kwargs,
169    ) -> None:
170        """
171        Dataset class to load and query datasets from the Pinecone Datasets catalog.
172        See `from_path` and `from_dataset_id` for examples on how to load a dataset.
173
174        Examples:
175            ```python
176            from pinecone_datasets import Dataset
177            dataset = Dataset.from_dataset_id("dataset_name")
178            # or
179            dataset = Dataset.from_path("gs://my-bucket/my-dataset")
180
181            for doc in dataset.iter_documents(batch_size=100):
182                index.upsert(doc)
183            for query in dataset.iter_queries(batch_size):
184                results = index.search(query)
185                # do something with the results
186            # or
187            dataset.documents # returns a pandas/polars DataFrame
188            dataset.queries # returns a pandas/polars DataFrame
189            ```
190
191        """
192        self._config = cfg
193        if dataset_path is not None:
194            endpoint = urlparse(dataset_path)._replace(path="").geturl()
195            self._fs = get_cloud_fs(endpoint, **kwargs)
196            self._dataset_path = dataset_path
197            if not self._fs.exists(self._dataset_path):
198                raise FileNotFoundError(
199                    "Dataset does not exist. Please check the path or dataset_id"
200                )
201        else:
202            self._fs = None
203            self._dataset_path = None
204        self._documents = None
205        self._queries = None
206        self._metadata = None
207        self._pinecone_client = None

Dataset class to load and query datasets from the Pinecone Datasets catalog. See from_path and from_dataset_id for examples on how to load a dataset.

Examples:
from pinecone_datasets import Dataset
dataset = Dataset.from_dataset_id("dataset_name")
# or
dataset = Dataset.from_path("gs://my-bucket/my-dataset")

for doc in dataset.iter_documents(batch_size=100):
    index.upsert(doc)
for query in dataset.iter_queries(batch_size):
    results = index.search(query)
    # do something with the results
# or
dataset.documents # returns a pandas/polars DataFrame
dataset.queries # returns a pandas/polars DataFrame
@classmethod
def from_path(cls, dataset_path, **kwargs):
66    @classmethod
67    def from_path(cls, dataset_path, **kwargs):
68        """
69        Create a Dataset object from local or cloud storage
70        Args:
71            dataset_path (str): a path to a local or cloud storage path containing a valid dataset.
72
73        Returns:
74            Dataset: a Dataset object
75        """
76        return cls(dataset_path=dataset_path, **kwargs)

Create a Dataset object from local or cloud storage

Arguments:
  • dataset_path (str): a path to a local or cloud storage path containing a valid dataset.
Returns:

Dataset: a Dataset object

@classmethod
def from_catalog(cls, dataset_id, catalog_base_path: str = '', **kwargs):
78    @classmethod
79    def from_catalog(cls, dataset_id, catalog_base_path: str = "", **kwargs):
80        """
81        Load a dataset from Pinecone's Datasets catalog, or from your own endpoint.
82
83        Args:
84            dataset_id (str): the id of the dataset to load within a catalog
85            catalog_base_path (str): the catalog's base path. Defaults to DATASETS_CATALOG_BASEPATH environment variable.
86                                     If neither are set, will use Pinecone's public catalog.
87
88        Returns:
89            Dataset: a Dataset object
90        """
91        catalog_base_path = (
92            catalog_base_path
93            if catalog_base_path
94            else os.environ.get("DATASETS_CATALOG_BASEPATH", cfg.Storage.endpoint)
95        )
96        dataset_path = os.path.join(catalog_base_path, f"{dataset_id}")
97        return cls(dataset_path=dataset_path, **kwargs)

Load a dataset from Pinecone's Datasets catalog, or from your own endpoint.

Arguments:
  • dataset_id (str): the id of the dataset to load within a catalog
  • catalog_base_path (str): the catalog's base path. Defaults to DATASETS_CATALOG_BASEPATH environment variable. If neither are set, will use Pinecone's public catalog.
Returns:

Dataset: a Dataset object

@classmethod
def from_pandas( cls, documents: pandas.core.frame.DataFrame, metadata: pinecone_datasets.catalog.DatasetMetadata, documents_column_mapping: Optional[Dict] = None, queries: Optional[pandas.core.frame.DataFrame] = None, queries_column_mapping: Optional[Dict] = None, **kwargs) -> pinecone_datasets.dataset.Dataset:
 99    @classmethod
100    def from_pandas(
101        cls,
102        documents: pd.DataFrame,
103        metadata: DatasetMetadata,
104        documents_column_mapping: Optional[Dict] = None,
105        queries: Optional[pd.DataFrame] = None,
106        queries_column_mapping: Optional[Dict] = None,
107        **kwargs,
108    ) -> "Dataset":
109        """
110        Create a Dataset object from a pandas DataFrame
111
112        Args:
113            documents (pd.DataFrame): a pandas DataFrame containing the documents
114            documents_column_mapping (Dict): a dictionary mapping the columns of the documents DataFrame to the Pinecone Datasets Schema
115            queries (pd.DataFrame): a pandas DataFrame containing the queries
116            queries_column_mapping (Dict): a dictionary mapping the columns of the queries DataFrame to the Pinecone Datasets Schema
117
118        Keyword Args:
119            kwargs (Dict): additional arguments to pass to the fsspec constructor
120
121        Returns:
122            Dataset: a Dataset object
123        """
124        clazz = cls(dataset_path=None, **kwargs)
125        clazz._documents = cls._read_pandas_dataframe(
126            documents, documents_column_mapping, cfg.Schema.Names.documents
127        )
128        clazz._queries = cls._read_pandas_dataframe(
129            queries, queries_column_mapping, cfg.Schema.Names.queries
130        )
131        clazz._metadata = metadata
132        return clazz

Create a Dataset object from a pandas DataFrame

Arguments:
  • documents (pd.DataFrame): a pandas DataFrame containing the documents
  • documents_column_mapping (Dict): a dictionary mapping the columns of the documents DataFrame to the Pinecone Datasets Schema
  • queries (pd.DataFrame): a pandas DataFrame containing the queries
  • queries_column_mapping (Dict): a dictionary mapping the columns of the queries DataFrame to the Pinecone Datasets Schema
Keyword Args:

kwargs (Dict): additional arguments to pass to the fsspec constructor

Returns:

Dataset: a Dataset object

def iter_documents( self, batch_size: int = 1, return_indexes=False) -> Iterator[List[Dict[str, Any]]]:
322    def iter_documents(
323        self, batch_size: int = 1, return_indexes=False
324    ) -> Iterator[List[Dict[str, Any]]]:
325        """
326        Iterates over the documents in the dataset.
327
328        Args:
329            batch_size (int, optional): The batch size to use for the iterator. Defaults to 1.
330
331        Returns:
332            Iterator[List[Dict[str, Any]]]: An iterator over the documents in the dataset.
333
334        Examples:
335            for batch in dataset.iter_documents(batch_size=100):
336                index.upsert(batch)
337        """
338        if isinstance(batch_size, int) and batch_size > 0:
339            return iter_pandas_dataframe_slices(
340                df=self.documents[self._config.Schema.documents_select_columns].dropna(
341                    axis=1, how="all"
342                ),
343                batch_size=batch_size,
344                return_indexes=return_indexes,
345            )
346        else:
347            raise ValueError("batch_size must be greater than 0")

Iterates over the documents in the dataset.

Arguments:
  • batch_size (int, optional): The batch size to use for the iterator. Defaults to 1.
Returns:

Iterator[List[Dict[str, Any]]]: An iterator over the documents in the dataset.

Examples:

for batch in dataset.iter_documents(batch_size=100): index.upsert(batch)

def iter_queries(self) -> Iterator[Dict[str, Any]]:
355    def iter_queries(self) -> Iterator[Dict[str, Any]]:
356        """
357        Iterates over the queries in the dataset.
358
359        Returns:
360            Iterator[Dict[str, Any]]: An iterator over the queries in the dataset.
361
362        Examples:
363            for query in dataset.iter_queries():
364                results = index.query(**query)
365                # do something with the results
366        """
367        return iter_pandas_dataframe_single(
368            self.queries[self._config.Schema.queries_select_columns]
369        )

Iterates over the queries in the dataset.

Returns:

Iterator[Dict[str, Any]]: An iterator over the queries in the dataset.

Examples:

for query in dataset.iter_queries(): results = index.query(**query) # do something with the results

def head(self, n: int = 5) -> pandas.core.frame.DataFrame:
377    def head(self, n: int = 5) -> pd.DataFrame:
378        return self.documents.head(n)
def to_path(self, dataset_path: str, **kwargs):
380    def to_path(self, dataset_path: str, **kwargs):
381        """
382        Saves the dataset to a local or cloud storage path.
383        """
384        fs = get_cloud_fs(dataset_path, **kwargs)
385
386        # save documents
387        documents_path = os.path.join(dataset_path, "documents")
388        fs.makedirs(documents_path, exist_ok=True)
389
390        documents_metadta_copy = self.documents["metadata"].copy()
391        try:
392            self.documents["metadata"] = self.documents["metadata"].apply(
393                self._convert_metadata_from_dict_to_json
394            )
395            self.documents.to_parquet(
396                os.path.join(documents_path, "part-0.parquet"),
397                engine="pyarrow",
398                index=False,
399                filesystem=fs,
400            )
401        finally:
402            self.documents["metadata"] = documents_metadta_copy
403        # save queries
404        if not self.queries.empty:
405            queries_path = os.path.join(dataset_path, "queries")
406            fs.makedirs(queries_path, exist_ok=True)
407            queries_filter_copy = self.queries["filter"].copy()
408            try:
409                self.queries["filter"] = self.queries["filter"].apply(
410                    self._convert_metadata_from_dict_to_json
411                )
412                self.queries.to_parquet(
413                    os.path.join(queries_path, "part-0.parquet"),
414                    engine="pyarrow",
415                    index=False,
416                    filesystem=fs,
417                )
418            finally:
419                self.queries["filter"] = queries_filter_copy
420        else:
421            warnings.warn("Queries are empty, not saving queries")
422
423        # save metadata
424        with fs.open(os.path.join(dataset_path, "metadata.json"), "w") as f:
425            json.dump(self.metadata.dict(), f)

Saves the dataset to a local or cloud storage path.

def to_catalog(self, dataset_id: str, catalog_base_path: str = '', **kwargs):
427    def to_catalog(
428        self,
429        dataset_id: str,
430        catalog_base_path: str = "",
431        **kwargs,
432    ):
433        """
434        Saves the dataset to the public catalog.
435        """
436
437        # TODO: duplicated code
438
439        catalog_base_path = (
440            catalog_base_path
441            if catalog_base_path
442            else os.environ.get("DATASETS_CATALOG_BASEPATH", cfg.Storage.endpoint)
443        )
444        dataset_path = os.path.join(catalog_base_path, f"{dataset_id}")
445        self.to_path(dataset_path, **kwargs)

Saves the dataset to the public catalog.

def to_pinecone_index( self, index_name: str, namespace: Optional[str] = '', should_create_index: bool = True, batch_size: int = 100, show_progress: bool = True, api_key: Optional[str] = None, environment: Optional[str] = None, **kwargs):
500    def to_pinecone_index(
501        self,
502        index_name: str,
503        namespace: Optional[str] = "",
504        should_create_index: bool = True,
505        batch_size: int = 100,
506        show_progress: bool = True,
507        api_key: Optional[str] = None,
508        environment: Optional[str] = None,
509        **kwargs,
510    ):
511        """
512        Saves the dataset to a Pinecone index.
513
514        this function will look for two environment variables:
515        - PINECONE_API_KEY
516        - PINECONE_ENVIRONMENT
517
518        Then, it will init a Pinecone Client and will perform an upsert to the index.
519        The upsert will be using async batches to increase performance.
520
521        Args:
522            index_name (str): the name of the index to upsert to
523            namespace (str, optional): the namespace to use for the upsert. Defaults to "".
524            batch_size (int, optional): the batch size to use for the upsert. Defaults to 100.
525            show_progress (bool, optional): whether to show a progress bar while upserting. Defaults to True.
526
527        Keyword Args:
528            kwargs (Dict): additional arguments to pass to the Pinecone Client constructor when creating the index.
529            see available parameters here: https://docs.pinecone.io/reference/create_index
530
531
532        Returns:
533            UpsertResponse: an object containing the upserted_count
534
535        Examples:
536            ```python
537            result = dataset.to_pinecone_index(index_name="my_index")
538            ```
539        """
540        if should_create_index:
541            if not self._create_index(
542                index_name, api_key=api_key, environment=environment, **kwargs
543            ):
544                raise RuntimeError("index creation failed")
545        else:
546            self._set_pinecone_index(api_key=api_key, environment=environment, **kwargs)
547
548        return self._upsert_to_index(
549            index_name=index_name,
550            namespace=namespace,
551            batch_size=batch_size,
552            show_progress=show_progress,
553        )

Saves the dataset to a Pinecone index.

this function will look for two environment variables:

  • PINECONE_API_KEY
  • PINECONE_ENVIRONMENT

Then, it will init a Pinecone Client and will perform an upsert to the index. The upsert will be using async batches to increase performance.

Arguments:
  • index_name (str): the name of the index to upsert to
  • namespace (str, optional): the namespace to use for the upsert. Defaults to "".
  • batch_size (int, optional): the batch size to use for the upsert. Defaults to 100.
  • show_progress (bool, optional): whether to show a progress bar while upserting. Defaults to True.
Keyword Args:

kwargs (Dict): additional arguments to pass to the Pinecone Client constructor when creating the index. see available parameters here: https://docs.pinecone.io/reference/create_index

Returns:

UpsertResponse: an object containing the upserted_count

Examples:
result = dataset.to_pinecone_index(index_name="my_index")