pinecone_datasets.dataset
1import glob 2import sys 3import os 4import itertools 5import time 6import json 7import asyncio 8import warnings 9from urllib.parse import urlparse 10from dataclasses import dataclass 11from importlib.metadata import version 12 13import gcsfs 14import s3fs 15import pandas as pd 16from tqdm.auto import tqdm 17import pyarrow.parquet as pq 18from pydantic import ValidationError 19from typing import Any, Generator, Iterator, List, Union, Dict, Optional, Tuple 20 21from pinecone_datasets import cfg 22from pinecone_datasets.catalog import DatasetMetadata 23from pinecone_datasets.fs import get_cloud_fs, LocalFileSystem 24 25import pinecone as pc 26from pinecone import Index 27 28 29class DatasetInitializationError(Exception): 30 long_message = """ 31 This dataset was not initialized from path, but from memory, e.g. Dataset.from_pandas(...) 32 Therefore this dataset cannot be reloaded from path, or use methods that require a path. 33 If you want to reload a dataset from path, please use the `from_path` method and pass a valid path. 34 """ 35 36 def __init__(self, message=long_message): 37 self.message = message 38 super().__init__(self.message) 39 40 41# TODO: import from Client 42@dataclass 43class UpsertResponse: 44 upserted_count: int 45 46 47def iter_pandas_dataframe_slices( 48 df: pd.DataFrame, batch_size, return_indexes 49) -> Generator[List[Dict[str, Any]], None, None]: 50 for i in range(0, len(df), batch_size): 51 if return_indexes: 52 yield (i, df.iloc[i : i + batch_size].to_dict(orient="records")) 53 else: 54 yield df.iloc[i : i + batch_size].to_dict(orient="records") 55 56 57def iter_pandas_dataframe_single( 58 df: pd.DataFrame, 59) -> Generator[Dict[str, Any], None, None]: 60 for i in range(0, len(df), 1): 61 yield df.iloc[i : i + 1].to_dict(orient="records")[0] 62 63 64class Dataset(object): 65 @classmethod 66 def from_path(cls, dataset_path, **kwargs): 67 """ 68 Create a Dataset object from local or cloud storage 69 Args: 70 dataset_path (str): a path to a local or cloud storage path containing a valid dataset. 71 72 Returns: 73 Dataset: a Dataset object 74 """ 75 return cls(dataset_path=dataset_path, **kwargs) 76 77 @classmethod 78 def from_catalog(cls, dataset_id, catalog_base_path: str = "", **kwargs): 79 """ 80 Load a dataset from Pinecone's Datasets catalog, or from your own endpoint. 81 82 Args: 83 dataset_id (str): the id of the dataset to load within a catalog 84 catalog_base_path (str): the catalog's base path. Defaults to DATASETS_CATALOG_BASEPATH environment variable. 85 If neither are set, will use Pinecone's public catalog. 86 87 Returns: 88 Dataset: a Dataset object 89 """ 90 catalog_base_path = ( 91 catalog_base_path 92 if catalog_base_path 93 else os.environ.get("DATASETS_CATALOG_BASEPATH", cfg.Storage.endpoint) 94 ) 95 dataset_path = os.path.join(catalog_base_path, f"{dataset_id}") 96 return cls(dataset_path=dataset_path, **kwargs) 97 98 @classmethod 99 def from_pandas( 100 cls, 101 documents: pd.DataFrame, 102 metadata: DatasetMetadata, 103 documents_column_mapping: Optional[Dict] = None, 104 queries: Optional[pd.DataFrame] = None, 105 queries_column_mapping: Optional[Dict] = None, 106 **kwargs, 107 ) -> "Dataset": 108 """ 109 Create a Dataset object from a pandas DataFrame 110 111 Args: 112 documents (pd.DataFrame): a pandas DataFrame containing the documents 113 documents_column_mapping (Dict): a dictionary mapping the columns of the documents DataFrame to the Pinecone Datasets Schema 114 queries (pd.DataFrame): a pandas DataFrame containing the queries 115 queries_column_mapping (Dict): a dictionary mapping the columns of the queries DataFrame to the Pinecone Datasets Schema 116 117 Keyword Args: 118 kwargs (Dict): additional arguments to pass to the fsspec constructor 119 120 Returns: 121 Dataset: a Dataset object 122 """ 123 clazz = cls(dataset_path=None, **kwargs) 124 clazz._documents = cls._read_pandas_dataframe( 125 documents, documents_column_mapping, cfg.Schema.Names.documents 126 ) 127 clazz._queries = cls._read_pandas_dataframe( 128 queries, queries_column_mapping, cfg.Schema.Names.queries 129 ) 130 clazz._metadata = metadata 131 return clazz 132 133 @staticmethod 134 def _read_pandas_dataframe( 135 df: pd.DataFrame, 136 column_mapping: Dict[str, str], 137 schema: List[Tuple[str, bool, Any]], 138 ) -> pd.DataFrame: 139 """ 140 Reads a pandas DataFrame and validates it against a schema. 141 142 Args: 143 df (pd.DataFrame): the pandas DataFrame to read 144 column_mapping (Dict[str, str]): a dictionary mapping the columns of the DataFrame to the Pinecone Datasets Schema (col_name, pinecone_name) 145 schema (List[Tuple[str, bool]]): the schema to validate against (column_name, is_nullable) 146 147 Returns: 148 pd.DataFrame: the validated, renamed DataFrame 149 """ 150 if df is None or df.empty: 151 return pd.DataFrame(columns=[column_name for column_name, _, _ in schema]) 152 else: 153 if column_mapping is not None: 154 df.rename(columns=column_mapping, inplace=True) 155 for column_name, is_nullable, null_value in schema: 156 if column_name not in df.columns and not is_nullable: 157 raise ValueError( 158 f"error, file is not matching Pinecone Datasets Schmea: {column_name} not found" 159 ) 160 elif column_name not in df.columns and is_nullable: 161 df[column_name] = null_value 162 return df[[column_name for column_name, _, _ in schema]] 163 164 def __init__( 165 self, 166 dataset_path: str, 167 **kwargs, 168 ) -> None: 169 """ 170 Dataset class to load and query datasets from the Pinecone Datasets catalog. 171 See `from_path` and `from_dataset_id` for examples on how to load a dataset. 172 173 Examples: 174 ```python 175 from pinecone_datasets import Dataset 176 dataset = Dataset.from_dataset_id("dataset_name") 177 # or 178 dataset = Dataset.from_path("gs://my-bucket/my-dataset") 179 180 for doc in dataset.iter_documents(batch_size=100): 181 index.upsert(doc) 182 for query in dataset.iter_queries(batch_size): 183 results = index.search(query) 184 # do something with the results 185 # or 186 dataset.documents # returns a pandas/polars DataFrame 187 dataset.queries # returns a pandas/polars DataFrame 188 ``` 189 190 """ 191 self._config = cfg 192 if dataset_path is not None: 193 endpoint = urlparse(dataset_path)._replace(path="").geturl() 194 self._fs = get_cloud_fs(endpoint, **kwargs) 195 self._dataset_path = dataset_path 196 if not self._fs.exists(self._dataset_path): 197 raise FileNotFoundError( 198 "Dataset does not exist. Please check the path or dataset_id" 199 ) 200 else: 201 self._fs = None 202 self._dataset_path = None 203 self._documents = None 204 self._queries = None 205 self._metadata = None 206 self._pinecone_client = None 207 208 def _is_datatype_exists(self, data_type: str) -> bool: 209 if not self._fs: 210 raise DatasetInitializationError() 211 return self._fs.exists(os.path.join(self._dataset_path, data_type)) 212 213 @staticmethod 214 def _convert_metadata_from_dict_to_json(metadata: Optional[dict]) -> str: 215 if pd.isna(metadata): 216 return None 217 if metadata and not isinstance(metadata, dict): 218 raise TypeError( 219 f"metadata must be a dict but its {type(metadata)} meta = {metadata}" 220 ) 221 return json.dumps(metadata, ensure_ascii=False) 222 223 @staticmethod 224 def _convert_metadata_from_json_to_dict(metadata: Optional[str]) -> dict: 225 if metadata is None: 226 return None 227 if not isinstance(metadata, str): 228 if isinstance(metadata, dict): 229 return metadata 230 else: 231 raise TypeError("metadata must be a string or dict") 232 return json.loads(metadata) 233 234 def _safe_read_from_path(self, data_type: str) -> pd.DataFrame: 235 if not self._fs: 236 raise DatasetInitializationError() 237 238 read_path_str = os.path.join(self._dataset_path, data_type, "*.parquet") 239 read_path = self._fs.glob(read_path_str) 240 if self._is_datatype_exists(data_type): 241 dataset = pq.ParquetDataset(read_path, filesystem=self._fs) 242 dataset_schema_names = dataset.schema.names 243 columns_to_null = [] 244 columns_not_null = [] 245 for column_name, is_nullable, null_value in getattr( 246 self._config.Schema.Names, data_type 247 ): 248 if column_name not in dataset_schema_names and not is_nullable: 249 raise ValueError( 250 f"error, file is not matching Pinecone Datasets Schmea: {column_name} not found" 251 ) 252 elif column_name not in dataset_schema_names and is_nullable: 253 columns_to_null.append((column_name, null_value)) 254 else: 255 columns_not_null.append(column_name) 256 try: 257 # TODO: use of the columns_not_null and columns_to_null is only a workaround for proper schema validation and versioning 258 df = dataset.read_pandas(columns=columns_not_null).to_pandas() 259 260 # metadta supposed to be a dict [if legacy] or string 261 if data_type == "documents": 262 df["metadata"] = df["metadata"].apply( 263 self._convert_metadata_from_json_to_dict 264 ) 265 elif data_type == "queries": 266 df["filter"] = df["filter"].apply( 267 self._convert_metadata_from_json_to_dict 268 ) 269 270 for column_name, null_value in columns_to_null: 271 df[column_name] = null_value 272 return df 273 # TODO: add more specific error handling, explain what is wrong 274 except Exception as e: 275 print("error, no exception: {}".format(e), file=sys.stderr) 276 raise (e) 277 else: 278 warnings.warn( 279 "WARNING: No data found at: {}. Returning empty DF".format( 280 read_path_str 281 ), 282 UserWarning, 283 stacklevel=0, 284 ) 285 return pd.DataFrame( 286 columns=[ 287 col[0] for col in getattr(self._config.Schema.Names, data_type) 288 ] 289 ) 290 291 def _load_metadata(self) -> DatasetMetadata: 292 if not self._fs: 293 raise DatasetInitializationError() 294 295 with self._fs.open( 296 os.path.join(self._dataset_path, "metadata.json"), "rb" 297 ) as f: 298 metadata = json.load(f) 299 try: 300 out = DatasetMetadata(**metadata) 301 return out 302 # TODO: add more specific error handling, explain what is wrong 303 except ValidationError as e: 304 raise e 305 306 def __getitem__(self, key: str): 307 if key in ["documents", "queries"]: 308 return getattr(self, key) 309 else: 310 raise KeyError("Dataset does not have key: {}".format(key)) 311 312 def __len__(self) -> int: 313 return self.documents.shape[0] 314 315 @property 316 def documents(self) -> pd.DataFrame: 317 if self._documents is None: 318 self._documents = self._safe_read_from_path("documents") 319 return self._documents 320 321 def iter_documents( 322 self, batch_size: int = 1, return_indexes=False 323 ) -> Iterator[List[Dict[str, Any]]]: 324 """ 325 Iterates over the documents in the dataset. 326 327 Args: 328 batch_size (int, optional): The batch size to use for the iterator. Defaults to 1. 329 330 Returns: 331 Iterator[List[Dict[str, Any]]]: An iterator over the documents in the dataset. 332 333 Examples: 334 for batch in dataset.iter_documents(batch_size=100): 335 index.upsert(batch) 336 """ 337 if isinstance(batch_size, int) and batch_size > 0: 338 return iter_pandas_dataframe_slices( 339 df=self.documents[self._config.Schema.documents_select_columns].dropna( 340 axis=1, how="all" 341 ), 342 batch_size=batch_size, 343 return_indexes=return_indexes, 344 ) 345 else: 346 raise ValueError("batch_size must be greater than 0") 347 348 @property 349 def queries(self) -> pd.DataFrame: 350 if self._queries is None: 351 self._queries = self._safe_read_from_path("queries") 352 return self._queries 353 354 def iter_queries(self) -> Iterator[Dict[str, Any]]: 355 """ 356 Iterates over the queries in the dataset. 357 358 Returns: 359 Iterator[Dict[str, Any]]: An iterator over the queries in the dataset. 360 361 Examples: 362 for query in dataset.iter_queries(): 363 results = index.query(**query) 364 # do something with the results 365 """ 366 return iter_pandas_dataframe_single( 367 self.queries[self._config.Schema.queries_select_columns] 368 ) 369 370 @property 371 def metadata(self) -> DatasetMetadata: 372 if self._metadata is None: 373 self._metadata = self._load_metadata() 374 return self._metadata 375 376 def head(self, n: int = 5) -> pd.DataFrame: 377 return self.documents.head(n) 378 379 def to_path(self, dataset_path: str, **kwargs): 380 """ 381 Saves the dataset to a local or cloud storage path. 382 """ 383 fs = get_cloud_fs(dataset_path, **kwargs) 384 385 # save documents 386 documents_path = os.path.join(dataset_path, "documents") 387 fs.makedirs(documents_path, exist_ok=True) 388 389 documents_metadta_copy = self.documents["metadata"].copy() 390 try: 391 self.documents["metadata"] = self.documents["metadata"].apply( 392 self._convert_metadata_from_dict_to_json 393 ) 394 self.documents.to_parquet( 395 os.path.join(documents_path, "part-0.parquet"), 396 engine="pyarrow", 397 index=False, 398 filesystem=fs, 399 ) 400 finally: 401 self.documents["metadata"] = documents_metadta_copy 402 # save queries 403 if not self.queries.empty: 404 queries_path = os.path.join(dataset_path, "queries") 405 fs.makedirs(queries_path, exist_ok=True) 406 queries_filter_copy = self.queries["filter"].copy() 407 try: 408 self.queries["filter"] = self.queries["filter"].apply( 409 self._convert_metadata_from_dict_to_json 410 ) 411 self.queries.to_parquet( 412 os.path.join(queries_path, "part-0.parquet"), 413 engine="pyarrow", 414 index=False, 415 filesystem=fs, 416 ) 417 finally: 418 self.queries["filter"] = queries_filter_copy 419 else: 420 warnings.warn("Queries are empty, not saving queries") 421 422 # save metadata 423 with fs.open(os.path.join(dataset_path, "metadata.json"), "w") as f: 424 json.dump(self.metadata.dict(), f) 425 426 def to_catalog( 427 self, 428 dataset_id: str, 429 catalog_base_path: str = "", 430 **kwargs, 431 ): 432 """ 433 Saves the dataset to the public catalog. 434 """ 435 436 # TODO: duplicated code 437 438 catalog_base_path = ( 439 catalog_base_path 440 if catalog_base_path 441 else os.environ.get("DATASETS_CATALOG_BASEPATH", cfg.Storage.endpoint) 442 ) 443 dataset_path = os.path.join(catalog_base_path, f"{dataset_id}") 444 self.to_path(dataset_path, **kwargs) 445 446 def _upsert_to_index( 447 self, index_name: str, namespace: str, batch_size: int, show_progress: bool 448 ): 449 pinecone_index = Index(index_name=index_name) 450 451 res = pinecone_index.upsert_from_dataframe( 452 self.documents[self._config.Schema.documents_select_columns].dropna( 453 axis=1, how="all" 454 ), 455 namespace=namespace, 456 batch_size=batch_size, 457 show_progress=show_progress, 458 ) 459 return {"upserted_count": res.upserted_count} 460 461 def _set_pinecone_index( 462 self, 463 api_key: Optional[str] = None, 464 environment: Optional[str] = None, 465 **kwargs, 466 ) -> None: 467 pc.init(api_key=api_key, environment=environment, **kwargs) 468 self._pinecone_client = pc 469 470 def _create_index( 471 self, 472 index_name: str, 473 api_key: Optional[str] = None, 474 environment: Optional[str] = None, 475 **kwargs, 476 ) -> Index: 477 self._set_pinecone_index(api_key=api_key, environment=environment) 478 pinecone_index_list = self._pinecone_client.list_indexes() 479 480 if index_name in pinecone_index_list: 481 raise ValueError( 482 f"index {index_name} already exists, Pinecone Datasets can only be upserted to a new indexe" 483 ) 484 else: 485 # create index 486 print("creating index") 487 try: 488 self._pinecone_client.create_index( 489 name=index_name, 490 dimension=self.metadata.dense_model.dimension, 491 **kwargs, 492 ) 493 print("index created") 494 return True 495 except Exception as e: 496 print(f"error creating index: {e}") 497 return False 498 499 def to_pinecone_index( 500 self, 501 index_name: str, 502 namespace: Optional[str] = "", 503 should_create_index: bool = True, 504 batch_size: int = 100, 505 show_progress: bool = True, 506 api_key: Optional[str] = None, 507 environment: Optional[str] = None, 508 **kwargs, 509 ): 510 """ 511 Saves the dataset to a Pinecone index. 512 513 this function will look for two environment variables: 514 - PINECONE_API_KEY 515 - PINECONE_ENVIRONMENT 516 517 Then, it will init a Pinecone Client and will perform an upsert to the index. 518 The upsert will be using async batches to increase performance. 519 520 Args: 521 index_name (str): the name of the index to upsert to 522 namespace (str, optional): the namespace to use for the upsert. Defaults to "". 523 batch_size (int, optional): the batch size to use for the upsert. Defaults to 100. 524 show_progress (bool, optional): whether to show a progress bar while upserting. Defaults to True. 525 526 Keyword Args: 527 kwargs (Dict): additional arguments to pass to the Pinecone Client constructor when creating the index. 528 see available parameters here: https://docs.pinecone.io/reference/create_index 529 530 531 Returns: 532 UpsertResponse: an object containing the upserted_count 533 534 Examples: 535 ```python 536 result = dataset.to_pinecone_index(index_name="my_index") 537 ``` 538 """ 539 if should_create_index: 540 if not self._create_index( 541 index_name, api_key=api_key, environment=environment, **kwargs 542 ): 543 raise RuntimeError("index creation failed") 544 else: 545 self._set_pinecone_index(api_key=api_key, environment=environment, **kwargs) 546 547 return self._upsert_to_index( 548 index_name=index_name, 549 namespace=namespace, 550 batch_size=batch_size, 551 show_progress=show_progress, 552 )
30class DatasetInitializationError(Exception): 31 long_message = """ 32 This dataset was not initialized from path, but from memory, e.g. Dataset.from_pandas(...) 33 Therefore this dataset cannot be reloaded from path, or use methods that require a path. 34 If you want to reload a dataset from path, please use the `from_path` method and pass a valid path. 35 """ 36 37 def __init__(self, message=long_message): 38 self.message = message 39 super().__init__(self.message)
Common base class for all non-exit exceptions.
Inherited Members
- builtins.BaseException
- with_traceback
48def iter_pandas_dataframe_slices( 49 df: pd.DataFrame, batch_size, return_indexes 50) -> Generator[List[Dict[str, Any]], None, None]: 51 for i in range(0, len(df), batch_size): 52 if return_indexes: 53 yield (i, df.iloc[i : i + batch_size].to_dict(orient="records")) 54 else: 55 yield df.iloc[i : i + batch_size].to_dict(orient="records")
65class Dataset(object): 66 @classmethod 67 def from_path(cls, dataset_path, **kwargs): 68 """ 69 Create a Dataset object from local or cloud storage 70 Args: 71 dataset_path (str): a path to a local or cloud storage path containing a valid dataset. 72 73 Returns: 74 Dataset: a Dataset object 75 """ 76 return cls(dataset_path=dataset_path, **kwargs) 77 78 @classmethod 79 def from_catalog(cls, dataset_id, catalog_base_path: str = "", **kwargs): 80 """ 81 Load a dataset from Pinecone's Datasets catalog, or from your own endpoint. 82 83 Args: 84 dataset_id (str): the id of the dataset to load within a catalog 85 catalog_base_path (str): the catalog's base path. Defaults to DATASETS_CATALOG_BASEPATH environment variable. 86 If neither are set, will use Pinecone's public catalog. 87 88 Returns: 89 Dataset: a Dataset object 90 """ 91 catalog_base_path = ( 92 catalog_base_path 93 if catalog_base_path 94 else os.environ.get("DATASETS_CATALOG_BASEPATH", cfg.Storage.endpoint) 95 ) 96 dataset_path = os.path.join(catalog_base_path, f"{dataset_id}") 97 return cls(dataset_path=dataset_path, **kwargs) 98 99 @classmethod 100 def from_pandas( 101 cls, 102 documents: pd.DataFrame, 103 metadata: DatasetMetadata, 104 documents_column_mapping: Optional[Dict] = None, 105 queries: Optional[pd.DataFrame] = None, 106 queries_column_mapping: Optional[Dict] = None, 107 **kwargs, 108 ) -> "Dataset": 109 """ 110 Create a Dataset object from a pandas DataFrame 111 112 Args: 113 documents (pd.DataFrame): a pandas DataFrame containing the documents 114 documents_column_mapping (Dict): a dictionary mapping the columns of the documents DataFrame to the Pinecone Datasets Schema 115 queries (pd.DataFrame): a pandas DataFrame containing the queries 116 queries_column_mapping (Dict): a dictionary mapping the columns of the queries DataFrame to the Pinecone Datasets Schema 117 118 Keyword Args: 119 kwargs (Dict): additional arguments to pass to the fsspec constructor 120 121 Returns: 122 Dataset: a Dataset object 123 """ 124 clazz = cls(dataset_path=None, **kwargs) 125 clazz._documents = cls._read_pandas_dataframe( 126 documents, documents_column_mapping, cfg.Schema.Names.documents 127 ) 128 clazz._queries = cls._read_pandas_dataframe( 129 queries, queries_column_mapping, cfg.Schema.Names.queries 130 ) 131 clazz._metadata = metadata 132 return clazz 133 134 @staticmethod 135 def _read_pandas_dataframe( 136 df: pd.DataFrame, 137 column_mapping: Dict[str, str], 138 schema: List[Tuple[str, bool, Any]], 139 ) -> pd.DataFrame: 140 """ 141 Reads a pandas DataFrame and validates it against a schema. 142 143 Args: 144 df (pd.DataFrame): the pandas DataFrame to read 145 column_mapping (Dict[str, str]): a dictionary mapping the columns of the DataFrame to the Pinecone Datasets Schema (col_name, pinecone_name) 146 schema (List[Tuple[str, bool]]): the schema to validate against (column_name, is_nullable) 147 148 Returns: 149 pd.DataFrame: the validated, renamed DataFrame 150 """ 151 if df is None or df.empty: 152 return pd.DataFrame(columns=[column_name for column_name, _, _ in schema]) 153 else: 154 if column_mapping is not None: 155 df.rename(columns=column_mapping, inplace=True) 156 for column_name, is_nullable, null_value in schema: 157 if column_name not in df.columns and not is_nullable: 158 raise ValueError( 159 f"error, file is not matching Pinecone Datasets Schmea: {column_name} not found" 160 ) 161 elif column_name not in df.columns and is_nullable: 162 df[column_name] = null_value 163 return df[[column_name for column_name, _, _ in schema]] 164 165 def __init__( 166 self, 167 dataset_path: str, 168 **kwargs, 169 ) -> None: 170 """ 171 Dataset class to load and query datasets from the Pinecone Datasets catalog. 172 See `from_path` and `from_dataset_id` for examples on how to load a dataset. 173 174 Examples: 175 ```python 176 from pinecone_datasets import Dataset 177 dataset = Dataset.from_dataset_id("dataset_name") 178 # or 179 dataset = Dataset.from_path("gs://my-bucket/my-dataset") 180 181 for doc in dataset.iter_documents(batch_size=100): 182 index.upsert(doc) 183 for query in dataset.iter_queries(batch_size): 184 results = index.search(query) 185 # do something with the results 186 # or 187 dataset.documents # returns a pandas/polars DataFrame 188 dataset.queries # returns a pandas/polars DataFrame 189 ``` 190 191 """ 192 self._config = cfg 193 if dataset_path is not None: 194 endpoint = urlparse(dataset_path)._replace(path="").geturl() 195 self._fs = get_cloud_fs(endpoint, **kwargs) 196 self._dataset_path = dataset_path 197 if not self._fs.exists(self._dataset_path): 198 raise FileNotFoundError( 199 "Dataset does not exist. Please check the path or dataset_id" 200 ) 201 else: 202 self._fs = None 203 self._dataset_path = None 204 self._documents = None 205 self._queries = None 206 self._metadata = None 207 self._pinecone_client = None 208 209 def _is_datatype_exists(self, data_type: str) -> bool: 210 if not self._fs: 211 raise DatasetInitializationError() 212 return self._fs.exists(os.path.join(self._dataset_path, data_type)) 213 214 @staticmethod 215 def _convert_metadata_from_dict_to_json(metadata: Optional[dict]) -> str: 216 if pd.isna(metadata): 217 return None 218 if metadata and not isinstance(metadata, dict): 219 raise TypeError( 220 f"metadata must be a dict but its {type(metadata)} meta = {metadata}" 221 ) 222 return json.dumps(metadata, ensure_ascii=False) 223 224 @staticmethod 225 def _convert_metadata_from_json_to_dict(metadata: Optional[str]) -> dict: 226 if metadata is None: 227 return None 228 if not isinstance(metadata, str): 229 if isinstance(metadata, dict): 230 return metadata 231 else: 232 raise TypeError("metadata must be a string or dict") 233 return json.loads(metadata) 234 235 def _safe_read_from_path(self, data_type: str) -> pd.DataFrame: 236 if not self._fs: 237 raise DatasetInitializationError() 238 239 read_path_str = os.path.join(self._dataset_path, data_type, "*.parquet") 240 read_path = self._fs.glob(read_path_str) 241 if self._is_datatype_exists(data_type): 242 dataset = pq.ParquetDataset(read_path, filesystem=self._fs) 243 dataset_schema_names = dataset.schema.names 244 columns_to_null = [] 245 columns_not_null = [] 246 for column_name, is_nullable, null_value in getattr( 247 self._config.Schema.Names, data_type 248 ): 249 if column_name not in dataset_schema_names and not is_nullable: 250 raise ValueError( 251 f"error, file is not matching Pinecone Datasets Schmea: {column_name} not found" 252 ) 253 elif column_name not in dataset_schema_names and is_nullable: 254 columns_to_null.append((column_name, null_value)) 255 else: 256 columns_not_null.append(column_name) 257 try: 258 # TODO: use of the columns_not_null and columns_to_null is only a workaround for proper schema validation and versioning 259 df = dataset.read_pandas(columns=columns_not_null).to_pandas() 260 261 # metadta supposed to be a dict [if legacy] or string 262 if data_type == "documents": 263 df["metadata"] = df["metadata"].apply( 264 self._convert_metadata_from_json_to_dict 265 ) 266 elif data_type == "queries": 267 df["filter"] = df["filter"].apply( 268 self._convert_metadata_from_json_to_dict 269 ) 270 271 for column_name, null_value in columns_to_null: 272 df[column_name] = null_value 273 return df 274 # TODO: add more specific error handling, explain what is wrong 275 except Exception as e: 276 print("error, no exception: {}".format(e), file=sys.stderr) 277 raise (e) 278 else: 279 warnings.warn( 280 "WARNING: No data found at: {}. Returning empty DF".format( 281 read_path_str 282 ), 283 UserWarning, 284 stacklevel=0, 285 ) 286 return pd.DataFrame( 287 columns=[ 288 col[0] for col in getattr(self._config.Schema.Names, data_type) 289 ] 290 ) 291 292 def _load_metadata(self) -> DatasetMetadata: 293 if not self._fs: 294 raise DatasetInitializationError() 295 296 with self._fs.open( 297 os.path.join(self._dataset_path, "metadata.json"), "rb" 298 ) as f: 299 metadata = json.load(f) 300 try: 301 out = DatasetMetadata(**metadata) 302 return out 303 # TODO: add more specific error handling, explain what is wrong 304 except ValidationError as e: 305 raise e 306 307 def __getitem__(self, key: str): 308 if key in ["documents", "queries"]: 309 return getattr(self, key) 310 else: 311 raise KeyError("Dataset does not have key: {}".format(key)) 312 313 def __len__(self) -> int: 314 return self.documents.shape[0] 315 316 @property 317 def documents(self) -> pd.DataFrame: 318 if self._documents is None: 319 self._documents = self._safe_read_from_path("documents") 320 return self._documents 321 322 def iter_documents( 323 self, batch_size: int = 1, return_indexes=False 324 ) -> Iterator[List[Dict[str, Any]]]: 325 """ 326 Iterates over the documents in the dataset. 327 328 Args: 329 batch_size (int, optional): The batch size to use for the iterator. Defaults to 1. 330 331 Returns: 332 Iterator[List[Dict[str, Any]]]: An iterator over the documents in the dataset. 333 334 Examples: 335 for batch in dataset.iter_documents(batch_size=100): 336 index.upsert(batch) 337 """ 338 if isinstance(batch_size, int) and batch_size > 0: 339 return iter_pandas_dataframe_slices( 340 df=self.documents[self._config.Schema.documents_select_columns].dropna( 341 axis=1, how="all" 342 ), 343 batch_size=batch_size, 344 return_indexes=return_indexes, 345 ) 346 else: 347 raise ValueError("batch_size must be greater than 0") 348 349 @property 350 def queries(self) -> pd.DataFrame: 351 if self._queries is None: 352 self._queries = self._safe_read_from_path("queries") 353 return self._queries 354 355 def iter_queries(self) -> Iterator[Dict[str, Any]]: 356 """ 357 Iterates over the queries in the dataset. 358 359 Returns: 360 Iterator[Dict[str, Any]]: An iterator over the queries in the dataset. 361 362 Examples: 363 for query in dataset.iter_queries(): 364 results = index.query(**query) 365 # do something with the results 366 """ 367 return iter_pandas_dataframe_single( 368 self.queries[self._config.Schema.queries_select_columns] 369 ) 370 371 @property 372 def metadata(self) -> DatasetMetadata: 373 if self._metadata is None: 374 self._metadata = self._load_metadata() 375 return self._metadata 376 377 def head(self, n: int = 5) -> pd.DataFrame: 378 return self.documents.head(n) 379 380 def to_path(self, dataset_path: str, **kwargs): 381 """ 382 Saves the dataset to a local or cloud storage path. 383 """ 384 fs = get_cloud_fs(dataset_path, **kwargs) 385 386 # save documents 387 documents_path = os.path.join(dataset_path, "documents") 388 fs.makedirs(documents_path, exist_ok=True) 389 390 documents_metadta_copy = self.documents["metadata"].copy() 391 try: 392 self.documents["metadata"] = self.documents["metadata"].apply( 393 self._convert_metadata_from_dict_to_json 394 ) 395 self.documents.to_parquet( 396 os.path.join(documents_path, "part-0.parquet"), 397 engine="pyarrow", 398 index=False, 399 filesystem=fs, 400 ) 401 finally: 402 self.documents["metadata"] = documents_metadta_copy 403 # save queries 404 if not self.queries.empty: 405 queries_path = os.path.join(dataset_path, "queries") 406 fs.makedirs(queries_path, exist_ok=True) 407 queries_filter_copy = self.queries["filter"].copy() 408 try: 409 self.queries["filter"] = self.queries["filter"].apply( 410 self._convert_metadata_from_dict_to_json 411 ) 412 self.queries.to_parquet( 413 os.path.join(queries_path, "part-0.parquet"), 414 engine="pyarrow", 415 index=False, 416 filesystem=fs, 417 ) 418 finally: 419 self.queries["filter"] = queries_filter_copy 420 else: 421 warnings.warn("Queries are empty, not saving queries") 422 423 # save metadata 424 with fs.open(os.path.join(dataset_path, "metadata.json"), "w") as f: 425 json.dump(self.metadata.dict(), f) 426 427 def to_catalog( 428 self, 429 dataset_id: str, 430 catalog_base_path: str = "", 431 **kwargs, 432 ): 433 """ 434 Saves the dataset to the public catalog. 435 """ 436 437 # TODO: duplicated code 438 439 catalog_base_path = ( 440 catalog_base_path 441 if catalog_base_path 442 else os.environ.get("DATASETS_CATALOG_BASEPATH", cfg.Storage.endpoint) 443 ) 444 dataset_path = os.path.join(catalog_base_path, f"{dataset_id}") 445 self.to_path(dataset_path, **kwargs) 446 447 def _upsert_to_index( 448 self, index_name: str, namespace: str, batch_size: int, show_progress: bool 449 ): 450 pinecone_index = Index(index_name=index_name) 451 452 res = pinecone_index.upsert_from_dataframe( 453 self.documents[self._config.Schema.documents_select_columns].dropna( 454 axis=1, how="all" 455 ), 456 namespace=namespace, 457 batch_size=batch_size, 458 show_progress=show_progress, 459 ) 460 return {"upserted_count": res.upserted_count} 461 462 def _set_pinecone_index( 463 self, 464 api_key: Optional[str] = None, 465 environment: Optional[str] = None, 466 **kwargs, 467 ) -> None: 468 pc.init(api_key=api_key, environment=environment, **kwargs) 469 self._pinecone_client = pc 470 471 def _create_index( 472 self, 473 index_name: str, 474 api_key: Optional[str] = None, 475 environment: Optional[str] = None, 476 **kwargs, 477 ) -> Index: 478 self._set_pinecone_index(api_key=api_key, environment=environment) 479 pinecone_index_list = self._pinecone_client.list_indexes() 480 481 if index_name in pinecone_index_list: 482 raise ValueError( 483 f"index {index_name} already exists, Pinecone Datasets can only be upserted to a new indexe" 484 ) 485 else: 486 # create index 487 print("creating index") 488 try: 489 self._pinecone_client.create_index( 490 name=index_name, 491 dimension=self.metadata.dense_model.dimension, 492 **kwargs, 493 ) 494 print("index created") 495 return True 496 except Exception as e: 497 print(f"error creating index: {e}") 498 return False 499 500 def to_pinecone_index( 501 self, 502 index_name: str, 503 namespace: Optional[str] = "", 504 should_create_index: bool = True, 505 batch_size: int = 100, 506 show_progress: bool = True, 507 api_key: Optional[str] = None, 508 environment: Optional[str] = None, 509 **kwargs, 510 ): 511 """ 512 Saves the dataset to a Pinecone index. 513 514 this function will look for two environment variables: 515 - PINECONE_API_KEY 516 - PINECONE_ENVIRONMENT 517 518 Then, it will init a Pinecone Client and will perform an upsert to the index. 519 The upsert will be using async batches to increase performance. 520 521 Args: 522 index_name (str): the name of the index to upsert to 523 namespace (str, optional): the namespace to use for the upsert. Defaults to "". 524 batch_size (int, optional): the batch size to use for the upsert. Defaults to 100. 525 show_progress (bool, optional): whether to show a progress bar while upserting. Defaults to True. 526 527 Keyword Args: 528 kwargs (Dict): additional arguments to pass to the Pinecone Client constructor when creating the index. 529 see available parameters here: https://docs.pinecone.io/reference/create_index 530 531 532 Returns: 533 UpsertResponse: an object containing the upserted_count 534 535 Examples: 536 ```python 537 result = dataset.to_pinecone_index(index_name="my_index") 538 ``` 539 """ 540 if should_create_index: 541 if not self._create_index( 542 index_name, api_key=api_key, environment=environment, **kwargs 543 ): 544 raise RuntimeError("index creation failed") 545 else: 546 self._set_pinecone_index(api_key=api_key, environment=environment, **kwargs) 547 548 return self._upsert_to_index( 549 index_name=index_name, 550 namespace=namespace, 551 batch_size=batch_size, 552 show_progress=show_progress, 553 )
165 def __init__( 166 self, 167 dataset_path: str, 168 **kwargs, 169 ) -> None: 170 """ 171 Dataset class to load and query datasets from the Pinecone Datasets catalog. 172 See `from_path` and `from_dataset_id` for examples on how to load a dataset. 173 174 Examples: 175 ```python 176 from pinecone_datasets import Dataset 177 dataset = Dataset.from_dataset_id("dataset_name") 178 # or 179 dataset = Dataset.from_path("gs://my-bucket/my-dataset") 180 181 for doc in dataset.iter_documents(batch_size=100): 182 index.upsert(doc) 183 for query in dataset.iter_queries(batch_size): 184 results = index.search(query) 185 # do something with the results 186 # or 187 dataset.documents # returns a pandas/polars DataFrame 188 dataset.queries # returns a pandas/polars DataFrame 189 ``` 190 191 """ 192 self._config = cfg 193 if dataset_path is not None: 194 endpoint = urlparse(dataset_path)._replace(path="").geturl() 195 self._fs = get_cloud_fs(endpoint, **kwargs) 196 self._dataset_path = dataset_path 197 if not self._fs.exists(self._dataset_path): 198 raise FileNotFoundError( 199 "Dataset does not exist. Please check the path or dataset_id" 200 ) 201 else: 202 self._fs = None 203 self._dataset_path = None 204 self._documents = None 205 self._queries = None 206 self._metadata = None 207 self._pinecone_client = None
Dataset class to load and query datasets from the Pinecone Datasets catalog.
See from_path
and from_dataset_id
for examples on how to load a dataset.
Examples:
from pinecone_datasets import Dataset dataset = Dataset.from_dataset_id("dataset_name") # or dataset = Dataset.from_path("gs://my-bucket/my-dataset") for doc in dataset.iter_documents(batch_size=100): index.upsert(doc) for query in dataset.iter_queries(batch_size): results = index.search(query) # do something with the results # or dataset.documents # returns a pandas/polars DataFrame dataset.queries # returns a pandas/polars DataFrame
66 @classmethod 67 def from_path(cls, dataset_path, **kwargs): 68 """ 69 Create a Dataset object from local or cloud storage 70 Args: 71 dataset_path (str): a path to a local or cloud storage path containing a valid dataset. 72 73 Returns: 74 Dataset: a Dataset object 75 """ 76 return cls(dataset_path=dataset_path, **kwargs)
Create a Dataset object from local or cloud storage
Arguments:
- dataset_path (str): a path to a local or cloud storage path containing a valid dataset.
Returns:
Dataset: a Dataset object
78 @classmethod 79 def from_catalog(cls, dataset_id, catalog_base_path: str = "", **kwargs): 80 """ 81 Load a dataset from Pinecone's Datasets catalog, or from your own endpoint. 82 83 Args: 84 dataset_id (str): the id of the dataset to load within a catalog 85 catalog_base_path (str): the catalog's base path. Defaults to DATASETS_CATALOG_BASEPATH environment variable. 86 If neither are set, will use Pinecone's public catalog. 87 88 Returns: 89 Dataset: a Dataset object 90 """ 91 catalog_base_path = ( 92 catalog_base_path 93 if catalog_base_path 94 else os.environ.get("DATASETS_CATALOG_BASEPATH", cfg.Storage.endpoint) 95 ) 96 dataset_path = os.path.join(catalog_base_path, f"{dataset_id}") 97 return cls(dataset_path=dataset_path, **kwargs)
Load a dataset from Pinecone's Datasets catalog, or from your own endpoint.
Arguments:
- dataset_id (str): the id of the dataset to load within a catalog
- catalog_base_path (str): the catalog's base path. Defaults to DATASETS_CATALOG_BASEPATH environment variable. If neither are set, will use Pinecone's public catalog.
Returns:
Dataset: a Dataset object
99 @classmethod 100 def from_pandas( 101 cls, 102 documents: pd.DataFrame, 103 metadata: DatasetMetadata, 104 documents_column_mapping: Optional[Dict] = None, 105 queries: Optional[pd.DataFrame] = None, 106 queries_column_mapping: Optional[Dict] = None, 107 **kwargs, 108 ) -> "Dataset": 109 """ 110 Create a Dataset object from a pandas DataFrame 111 112 Args: 113 documents (pd.DataFrame): a pandas DataFrame containing the documents 114 documents_column_mapping (Dict): a dictionary mapping the columns of the documents DataFrame to the Pinecone Datasets Schema 115 queries (pd.DataFrame): a pandas DataFrame containing the queries 116 queries_column_mapping (Dict): a dictionary mapping the columns of the queries DataFrame to the Pinecone Datasets Schema 117 118 Keyword Args: 119 kwargs (Dict): additional arguments to pass to the fsspec constructor 120 121 Returns: 122 Dataset: a Dataset object 123 """ 124 clazz = cls(dataset_path=None, **kwargs) 125 clazz._documents = cls._read_pandas_dataframe( 126 documents, documents_column_mapping, cfg.Schema.Names.documents 127 ) 128 clazz._queries = cls._read_pandas_dataframe( 129 queries, queries_column_mapping, cfg.Schema.Names.queries 130 ) 131 clazz._metadata = metadata 132 return clazz
Create a Dataset object from a pandas DataFrame
Arguments:
- documents (pd.DataFrame): a pandas DataFrame containing the documents
- documents_column_mapping (Dict): a dictionary mapping the columns of the documents DataFrame to the Pinecone Datasets Schema
- queries (pd.DataFrame): a pandas DataFrame containing the queries
- queries_column_mapping (Dict): a dictionary mapping the columns of the queries DataFrame to the Pinecone Datasets Schema
Keyword Args:
kwargs (Dict): additional arguments to pass to the fsspec constructor
Returns:
Dataset: a Dataset object
322 def iter_documents( 323 self, batch_size: int = 1, return_indexes=False 324 ) -> Iterator[List[Dict[str, Any]]]: 325 """ 326 Iterates over the documents in the dataset. 327 328 Args: 329 batch_size (int, optional): The batch size to use for the iterator. Defaults to 1. 330 331 Returns: 332 Iterator[List[Dict[str, Any]]]: An iterator over the documents in the dataset. 333 334 Examples: 335 for batch in dataset.iter_documents(batch_size=100): 336 index.upsert(batch) 337 """ 338 if isinstance(batch_size, int) and batch_size > 0: 339 return iter_pandas_dataframe_slices( 340 df=self.documents[self._config.Schema.documents_select_columns].dropna( 341 axis=1, how="all" 342 ), 343 batch_size=batch_size, 344 return_indexes=return_indexes, 345 ) 346 else: 347 raise ValueError("batch_size must be greater than 0")
Iterates over the documents in the dataset.
Arguments:
- batch_size (int, optional): The batch size to use for the iterator. Defaults to 1.
Returns:
Iterator[List[Dict[str, Any]]]: An iterator over the documents in the dataset.
Examples:
for batch in dataset.iter_documents(batch_size=100): index.upsert(batch)
355 def iter_queries(self) -> Iterator[Dict[str, Any]]: 356 """ 357 Iterates over the queries in the dataset. 358 359 Returns: 360 Iterator[Dict[str, Any]]: An iterator over the queries in the dataset. 361 362 Examples: 363 for query in dataset.iter_queries(): 364 results = index.query(**query) 365 # do something with the results 366 """ 367 return iter_pandas_dataframe_single( 368 self.queries[self._config.Schema.queries_select_columns] 369 )
Iterates over the queries in the dataset.
Returns:
Iterator[Dict[str, Any]]: An iterator over the queries in the dataset.
Examples:
for query in dataset.iter_queries(): results = index.query(**query) # do something with the results
380 def to_path(self, dataset_path: str, **kwargs): 381 """ 382 Saves the dataset to a local or cloud storage path. 383 """ 384 fs = get_cloud_fs(dataset_path, **kwargs) 385 386 # save documents 387 documents_path = os.path.join(dataset_path, "documents") 388 fs.makedirs(documents_path, exist_ok=True) 389 390 documents_metadta_copy = self.documents["metadata"].copy() 391 try: 392 self.documents["metadata"] = self.documents["metadata"].apply( 393 self._convert_metadata_from_dict_to_json 394 ) 395 self.documents.to_parquet( 396 os.path.join(documents_path, "part-0.parquet"), 397 engine="pyarrow", 398 index=False, 399 filesystem=fs, 400 ) 401 finally: 402 self.documents["metadata"] = documents_metadta_copy 403 # save queries 404 if not self.queries.empty: 405 queries_path = os.path.join(dataset_path, "queries") 406 fs.makedirs(queries_path, exist_ok=True) 407 queries_filter_copy = self.queries["filter"].copy() 408 try: 409 self.queries["filter"] = self.queries["filter"].apply( 410 self._convert_metadata_from_dict_to_json 411 ) 412 self.queries.to_parquet( 413 os.path.join(queries_path, "part-0.parquet"), 414 engine="pyarrow", 415 index=False, 416 filesystem=fs, 417 ) 418 finally: 419 self.queries["filter"] = queries_filter_copy 420 else: 421 warnings.warn("Queries are empty, not saving queries") 422 423 # save metadata 424 with fs.open(os.path.join(dataset_path, "metadata.json"), "w") as f: 425 json.dump(self.metadata.dict(), f)
Saves the dataset to a local or cloud storage path.
427 def to_catalog( 428 self, 429 dataset_id: str, 430 catalog_base_path: str = "", 431 **kwargs, 432 ): 433 """ 434 Saves the dataset to the public catalog. 435 """ 436 437 # TODO: duplicated code 438 439 catalog_base_path = ( 440 catalog_base_path 441 if catalog_base_path 442 else os.environ.get("DATASETS_CATALOG_BASEPATH", cfg.Storage.endpoint) 443 ) 444 dataset_path = os.path.join(catalog_base_path, f"{dataset_id}") 445 self.to_path(dataset_path, **kwargs)
Saves the dataset to the public catalog.
500 def to_pinecone_index( 501 self, 502 index_name: str, 503 namespace: Optional[str] = "", 504 should_create_index: bool = True, 505 batch_size: int = 100, 506 show_progress: bool = True, 507 api_key: Optional[str] = None, 508 environment: Optional[str] = None, 509 **kwargs, 510 ): 511 """ 512 Saves the dataset to a Pinecone index. 513 514 this function will look for two environment variables: 515 - PINECONE_API_KEY 516 - PINECONE_ENVIRONMENT 517 518 Then, it will init a Pinecone Client and will perform an upsert to the index. 519 The upsert will be using async batches to increase performance. 520 521 Args: 522 index_name (str): the name of the index to upsert to 523 namespace (str, optional): the namespace to use for the upsert. Defaults to "". 524 batch_size (int, optional): the batch size to use for the upsert. Defaults to 100. 525 show_progress (bool, optional): whether to show a progress bar while upserting. Defaults to True. 526 527 Keyword Args: 528 kwargs (Dict): additional arguments to pass to the Pinecone Client constructor when creating the index. 529 see available parameters here: https://docs.pinecone.io/reference/create_index 530 531 532 Returns: 533 UpsertResponse: an object containing the upserted_count 534 535 Examples: 536 ```python 537 result = dataset.to_pinecone_index(index_name="my_index") 538 ``` 539 """ 540 if should_create_index: 541 if not self._create_index( 542 index_name, api_key=api_key, environment=environment, **kwargs 543 ): 544 raise RuntimeError("index creation failed") 545 else: 546 self._set_pinecone_index(api_key=api_key, environment=environment, **kwargs) 547 548 return self._upsert_to_index( 549 index_name=index_name, 550 namespace=namespace, 551 batch_size=batch_size, 552 show_progress=show_progress, 553 )
Saves the dataset to a Pinecone index.
this function will look for two environment variables:
- PINECONE_API_KEY
- PINECONE_ENVIRONMENT
Then, it will init a Pinecone Client and will perform an upsert to the index. The upsert will be using async batches to increase performance.
Arguments:
- index_name (str): the name of the index to upsert to
- namespace (str, optional): the namespace to use for the upsert. Defaults to "".
- batch_size (int, optional): the batch size to use for the upsert. Defaults to 100.
- show_progress (bool, optional): whether to show a progress bar while upserting. Defaults to True.
Keyword Args:
kwargs (Dict): additional arguments to pass to the Pinecone Client constructor when creating the index. see available parameters here: https://docs.pinecone.io/reference/create_index
Returns:
UpsertResponse: an object containing the upserted_count
Examples:
result = dataset.to_pinecone_index(index_name="my_index")