1{ 2 "metadata": { 3 "language_info": { 4 "codemirror_mode": { 5 "name": "ipython", 6 "version": 3 7 }, 8 "file_extension": ".py", 9 "mimetype": "text/x-python", 10 "name": "python", 11 "nbconvert_exporter": "python", 12 "pygments_lexer": "ipython3", 13 "version": "3.6.10" 14 }, 15 "orig_nbformat": 2, 16 "kernelspec": { 17 "name": "python3610jvsc74a57bd0eb5e09632d6ea1cbf3eb9da7e37b7cf581db5ed13074b21cc44e159dc62acdab", 18 "display_name": "Python 3.6.10 64-bit ('dataloader': conda)" 19 } 20 }, 21 "nbformat": 4, 22 "nbformat_minor": 2, 23 "cells": [ 24 { 25 "source": [ 26 "## \\[RFC\\] How DataFrames (DF) and DataPipes (DP) work together" 27 ], 28 "cell_type": "markdown", 29 "metadata": {} 30 }, 31 { 32 "cell_type": "code", 33 "execution_count": 1, 34 "metadata": {}, 35 "outputs": [], 36 "source": [ 37 "from importlib import reload\n", 38 "import torch\n", 39 "reload(torch)\n", 40 "from torch.utils.data import IterDataPipe" 41 ] 42 }, 43 { 44 "cell_type": "code", 45 "execution_count": 2, 46 "metadata": {}, 47 "outputs": [], 48 "source": [ 49 "# Example IterDataPipe\n", 50 "class ExampleIterPipe(IterDataPipe):\n", 51 " def __init__(self, range = 20):\n", 52 " self.range = range\n", 53 " def __iter__(self):\n", 54 " for i in range(self.range):\n", 55 " yield i\n", 56 "\n", 57 "def get_dataframes_pipe(range = 10, dataframe_size = 7):\n", 58 " return ExampleIterPipe(range = range).map(lambda i: (i, i % 3))._to_dataframes_pipe(columns = ['i','j'], dataframe_size = dataframe_size)\n", 59 "\n", 60 "def get_regular_pipe(range = 10):\n", 61 " return ExampleIterPipe(range = range).map(lambda i: (i, i % 3))\n" 62 ] 63 }, 64 { 65 "source": [ 66 "Doesn't matter how DF composed internally, iterator over DF Pipe gives single rows to user. This is similar to regular DataPipe." 67 ], 68 "cell_type": "markdown", 69 "metadata": {} 70 }, 71 { 72 "cell_type": "code", 73 "execution_count": 3, 74 "metadata": {}, 75 "outputs": [ 76 { 77 "output_type": "stream", 78 "name": "stdout", 79 "text": [ 80 "DataFrames Pipe\n(0, 0)\n(1, 1)\n(2, 2)\n(3, 0)\n(4, 1)\n(5, 2)\n(6, 0)\n(7, 1)\n(8, 2)\n(9, 0)\nRegular DataPipe\n(0, 0)\n(1, 1)\n(2, 2)\n(3, 0)\n(4, 1)\n(5, 2)\n(6, 0)\n(7, 1)\n(8, 2)\n(9, 0)\n" 81 ] 82 } 83 ], 84 "source": [ 85 "print('DataFrames Pipe')\n", 86 "dp = get_dataframes_pipe()\n", 87 "for i in dp:\n", 88 " print(i)\n", 89 "\n", 90 "print('Regular DataPipe')\n", 91 "dp = get_regular_pipe()\n", 92 "for i in dp:\n", 93 " print(i)" 94 ] 95 }, 96 { 97 "source": [ 98 "You can iterate over raw DF using `raw_iterator`" 99 ], 100 "cell_type": "markdown", 101 "metadata": {} 102 }, 103 { 104 "cell_type": "code", 105 "execution_count": 4, 106 "metadata": {}, 107 "outputs": [ 108 { 109 "output_type": "stream", 110 "name": "stdout", 111 "text": [ 112 " i j\n0 0 0\n1 1 1\n2 2 2\n3 3 0\n4 4 1\n5 5 2\n6 6 0\n i j\n0 7 1\n1 8 2\n2 9 0\n" 113 ] 114 } 115 ], 116 "source": [ 117 "dp = get_dataframes_pipe()\n", 118 "for i in dp.raw_iterator():\n", 119 " print(i)" 120 ] 121 }, 122 { 123 "source": [ 124 "Operations over DF Pipe is captured" 125 ], 126 "cell_type": "markdown", 127 "metadata": {} 128 }, 129 { 130 "cell_type": "code", 131 "execution_count": 5, 132 "metadata": { 133 "tags": [] 134 }, 135 "outputs": [ 136 { 137 "output_type": "stream", 138 "name": "stdout", 139 "text": [ 140 "var_3 = input_var_2.i * 100\nvar_4 = var_3 + input_var_2.j\nvar_5 = var_4 - 2.7\ninput_var_2[\"y\"] = var_5\n" 141 ] 142 } 143 ], 144 "source": [ 145 "dp = get_dataframes_pipe(dataframe_size = 3)\n", 146 "dp['y'] = dp.i * 100 + dp.j - 2.7\n", 147 "print(dp.ops_str())\n" 148 ] 149 }, 150 { 151 "source": [ 152 "Captured operations executed on `__next__` calls of constructed DataPipe" 153 ], 154 "cell_type": "markdown", 155 "metadata": {} 156 }, 157 { 158 "cell_type": "code", 159 "execution_count": 6, 160 "metadata": {}, 161 "outputs": [ 162 { 163 "output_type": "stream", 164 "name": "stdout", 165 "text": [ 166 " i j y\n0 0 0 -2.7\n1 1 1 98.3\n2 2 2 199.3\n i j y\n0 3 0 297.3\n1 4 1 398.3\n2 5 2 499.3\n i j y\n0 6 0 597.3\n1 7 1 698.3\n2 8 2 799.3\n i j y\n0 9 0 897.3\n" 167 ] 168 } 169 ], 170 "source": [ 171 "dp = get_dataframes_pipe(dataframe_size = 3)\n", 172 "dp['y'] = dp.i * 100 + dp.j - 2.7\n", 173 "for i in dp.raw_iterator():\n", 174 " print(i)" 175 ] 176 }, 177 { 178 "source": [ 179 "`shuffle` of DataFramePipe effects rows in individual manner" 180 ], 181 "cell_type": "markdown", 182 "metadata": {} 183 }, 184 { 185 "cell_type": "code", 186 "execution_count": 7, 187 "metadata": {}, 188 "outputs": [ 189 { 190 "output_type": "stream", 191 "name": "stdout", 192 "text": [ 193 "Raw DataFrames iterator\n i j\n2 8 2\n2 2 2\n2 5 2\n i j\n1 4 1\n1 1 1\n0 3 0\n i j\n1 7 1\n0 9 0\n0 6 0\n i j\n0 0 0\nRegular DataFrames iterator\n(1, 1)\n(5, 2)\n(8, 2)\n(9, 0)\n(7, 1)\n(6, 0)\n(3, 0)\n(4, 1)\n(0, 0)\n(2, 2)\nRegular iterator\n(5, 2)\n(6, 0)\n(0, 0)\n(9, 0)\n(3, 0)\n(1, 1)\n(2, 2)\n(8, 2)\n(4, 1)\n(7, 1)\n" 194 ] 195 } 196 ], 197 "source": [ 198 "dp = get_dataframes_pipe(dataframe_size = 3)\n", 199 "dp = dp.shuffle()\n", 200 "print('Raw DataFrames iterator')\n", 201 "for i in dp.raw_iterator():\n", 202 " print(i)\n", 203 "\n", 204 "print('Regular DataFrames iterator')\n", 205 "for i in dp:\n", 206 " print(i)\n", 207 "\n", 208 "\n", 209 "# this is similar to shuffle of regular DataPipe\n", 210 "dp = get_regular_pipe()\n", 211 "dp = dp.shuffle()\n", 212 "print('Regular iterator')\n", 213 "for i in dp:\n", 214 " print(i)" 215 ] 216 }, 217 { 218 "source": [ 219 "You can continue mixing DF and DP operations" 220 ], 221 "cell_type": "markdown", 222 "metadata": {} 223 }, 224 { 225 "cell_type": "code", 226 "execution_count": 8, 227 "metadata": {}, 228 "outputs": [ 229 { 230 "output_type": "stream", 231 "name": "stdout", 232 "text": [ 233 " i j y\n0 -17 -17 -197000.0\n1 -13 -16 3813000.0\n0 -11 -17 5803000.0\n i j y\n2 -12 -15 4823000.0\n1 -10 -16 6813000.0\n1 -16 -16 813000.0\n i j y\n0 -8 -17 8803000.0\n2 -9 -15 7823000.0\n0 -14 -17 2803000.0\n i j y\n2 -15 -15 1823000.0\n" 234 ] 235 } 236 ], 237 "source": [ 238 "dp = get_dataframes_pipe(dataframe_size = 3)\n", 239 "dp['y'] = dp.i * 100 + dp.j - 2.7\n", 240 "dp = dp.shuffle()\n", 241 "dp = dp - 17\n", 242 "dp['y'] = dp.y * 10000\n", 243 "for i in dp.raw_iterator():\n", 244 " print(i)" 245 ] 246 }, 247 { 248 "source": [ 249 "Batching combines everything into `list` it is possible to nest `list`s. List may have any number of DataFrames as soon as total number of rows equal to batch size." 250 ], 251 "cell_type": "markdown", 252 "metadata": {} 253 }, 254 { 255 "cell_type": "code", 256 "execution_count": 9, 257 "metadata": {}, 258 "outputs": [ 259 { 260 "output_type": "stream", 261 "name": "stdout", 262 "text": [ 263 "Iterate over DataFrame batches\n[(6, 0),(0, 0)]\n[(4, 1),(1, 1)]\n[(2, 2),(9, 0)]\n[(3, 0),(5, 2)]\n[(7, 1),(8, 2)]\nIterate over regular batches\n[(1, 1),(4, 1)]\n[(2, 2),(3, 0)]\n[(6, 0),(7, 1)]\n[(8, 2),(0, 0)]\n[(5, 2),(9, 0)]\n" 264 ] 265 } 266 ], 267 "source": [ 268 "dp = get_dataframes_pipe(dataframe_size = 3)\n", 269 "dp = dp.shuffle()\n", 270 "dp = dp.batch(2)\n", 271 "print(\"Iterate over DataFrame batches\")\n", 272 "for i,v in enumerate(dp):\n", 273 " print(v)\n", 274 "\n", 275 "# this is similar to batching of regular DataPipe\n", 276 "dp = get_regular_pipe()\n", 277 "dp = dp.shuffle()\n", 278 "dp = dp.batch(2)\n", 279 "print(\"Iterate over regular batches\")\n", 280 "for i in dp:\n", 281 " print(i)" 282 ] 283 }, 284 { 285 "source": [ 286 "Some details about internal storage of batched DataFrames and how they are iterated" 287 ], 288 "cell_type": "markdown", 289 "metadata": {} 290 }, 291 { 292 "cell_type": "code", 293 "execution_count": 10, 294 "metadata": {}, 295 "outputs": [ 296 { 297 "output_type": "stream", 298 "name": "stdout", 299 "text": [ 300 "Type: <class 'torch.utils.data.datapipes.iter.dataframes.DataChunkDF'>\n", 301 "As string: [(0, 0),(3, 0)]\n", 302 "Iterated regularly:\n", 303 "-- batch start --\n", 304 "(0, 0)\n", 305 "(3, 0)\n", 306 "-- batch end --\n", 307 "Iterated in inner format (for developers):\n", 308 "-- df batch start --\n", 309 " i j\n", 310 "0 0 0\n", 311 "0 3 0\n", 312 "-- df batch end --\n", 313 "Type: <class 'torch.utils.data.datapipes.iter.dataframes.DataChunkDF'>\n", 314 "As string: [(6, 0),(1, 1)]\n", 315 "Iterated regularly:\n", 316 "-- batch start --\n", 317 "(6, 0)\n", 318 "(1, 1)\n", 319 "-- batch end --\n", 320 "Iterated in inner format (for developers):\n", 321 "-- df batch start --\n", 322 " i j\n", 323 "0 6 0\n", 324 "1 1 1\n", 325 "-- df batch end --\n", 326 "Type: <class 'torch.utils.data.datapipes.iter.dataframes.DataChunkDF'>\n", 327 "As string: [(9, 0),(4, 1)]\n", 328 "Iterated regularly:\n", 329 "-- batch start --\n", 330 "(9, 0)\n", 331 "(4, 1)\n", 332 "-- batch end --\n", 333 "Iterated in inner format (for developers):\n", 334 "-- df batch start --\n", 335 " i j\n", 336 "0 9 0\n", 337 "1 4 1\n", 338 "-- df batch end --\n", 339 "Type: <class 'torch.utils.data.datapipes.iter.dataframes.DataChunkDF'>\n", 340 "As string: [(5, 2),(2, 2)]\n", 341 "Iterated regularly:\n", 342 "-- batch start --\n", 343 "(5, 2)\n", 344 "(2, 2)\n", 345 "-- batch end --\n", 346 "Iterated in inner format (for developers):\n", 347 "-- df batch start --\n", 348 " i j\n", 349 "2 5 2\n", 350 "2 2 2\n", 351 "-- df batch end --\n", 352 "Type: <class 'torch.utils.data.datapipes.iter.dataframes.DataChunkDF'>\n", 353 "As string: [(8, 2),(7, 1)]\n", 354 "Iterated regularly:\n", 355 "-- batch start --\n", 356 "(8, 2)\n", 357 "(7, 1)\n", 358 "-- batch end --\n", 359 "Iterated in inner format (for developers):\n", 360 "-- df batch start --\n", 361 " i j\n", 362 "2 8 2\n", 363 "1 7 1\n", 364 "-- df batch end --\n" 365 ] 366 } 367 ], 368 "source": [ 369 "dp = get_dataframes_pipe(dataframe_size = 3)\n", 370 "dp = dp.shuffle()\n", 371 "dp = dp.batch(2)\n", 372 "for i in dp:\n", 373 " print(\"Type: \", type(i))\n", 374 " print(\"As string: \", i)\n", 375 " print(\"Iterated regularly:\")\n", 376 " print('-- batch start --')\n", 377 " for item in i:\n", 378 " print(item)\n", 379 " print('-- batch end --')\n", 380 " print(\"Iterated in inner format (for developers):\")\n", 381 " print('-- df batch start --')\n", 382 " for item in i.raw_iterator():\n", 383 " print(item)\n", 384 " print('-- df batch end --') " 385 ] 386 }, 387 { 388 "source": [ 389 "`concat` should work only of DF with same schema, this code should produce an error " 390 ], 391 "cell_type": "markdown", 392 "metadata": {} 393 }, 394 { 395 "cell_type": "code", 396 "execution_count": 11, 397 "metadata": {}, 398 "outputs": [], 399 "source": [ 400 "# TODO!\n", 401 "# dp0 = get_dataframes_pipe(range = 8, dataframe_size = 4)\n", 402 "# dp = get_dataframes_pipe(range = 6, dataframe_size = 3)\n", 403 "# dp['y'] = dp.i * 100 + dp.j - 2.7\n", 404 "# dp = dp.concat(dp0)\n", 405 "# for i,v in enumerate(dp.raw_iterator()):\n", 406 "# print(v)" 407 ] 408 }, 409 { 410 "source": [ 411 "`unbatch` of `list` with DataFrame works similarly to regular unbatch.\n", 412 "Note: DataFrame sizes might change" 413 ], 414 "cell_type": "markdown", 415 "metadata": {} 416 }, 417 { 418 "cell_type": "code", 419 "execution_count": 12, 420 "metadata": {}, 421 "outputs": [ 422 { 423 "output_type": "error", 424 "ename": "AttributeError", 425 "evalue": "", 426 "traceback": [ 427 "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m", 428 "\u001b[0;31mAttributeError\u001b[0m Traceback (most recent call last)", 429 "\u001b[0;32m<ipython-input-12-fa80e9c68655>\u001b[0m in \u001b[0;36m<module>\u001b[0;34m\u001b[0m\n\u001b[1;32m 4\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 5\u001b[0m \u001b[0;31m# Here is bug with unbatching which doesn't detect DF type.\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m----> 6\u001b[0;31m \u001b[0mdp\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0;34m'z'\u001b[0m\u001b[0;34m]\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mdp\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0my\u001b[0m \u001b[0;34m-\u001b[0m \u001b[0;36m100\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 7\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 8\u001b[0m \u001b[0;32mfor\u001b[0m \u001b[0mi\u001b[0m \u001b[0;32min\u001b[0m \u001b[0mdp\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mraw_iterator\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n", 430 "\u001b[0;32m~/dataset/pytorch/torch/utils/data/dataset.py\u001b[0m in \u001b[0;36m__getattr__\u001b[0;34m(self, attribute_name)\u001b[0m\n\u001b[1;32m 222\u001b[0m \u001b[0;32mreturn\u001b[0m \u001b[0mfunction\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 223\u001b[0m \u001b[0;32melse\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 224\u001b[0;31m \u001b[0;32mraise\u001b[0m \u001b[0mAttributeError\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 225\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 226\u001b[0m \u001b[0;32mdef\u001b[0m \u001b[0m__reduce_ex__\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m*\u001b[0m\u001b[0margs\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m**\u001b[0m\u001b[0mkwargs\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n", 431 "\u001b[0;31mAttributeError\u001b[0m: " 432 ] 433 } 434 ], 435 "source": [ 436 "dp = get_dataframes_pipe(range = 18, dataframe_size = 3)\n", 437 "dp['y'] = dp.i * 100 + dp.j - 2.7\n", 438 "dp = dp.batch(5).batch(3).batch(1).unbatch(unbatch_level = 3)\n", 439 "\n", 440 "# Here is bug with unbatching which doesn't detect DF type.\n", 441 "dp['z'] = dp.y - 100\n", 442 "\n", 443 "for i in dp.raw_iterator():\n", 444 " print(i)" 445 ] 446 }, 447 { 448 "source": [ 449 "`map` applied to individual rows, `nesting_level` argument used to penetrate batching" 450 ], 451 "cell_type": "markdown", 452 "metadata": {} 453 }, 454 { 455 "cell_type": "code", 456 "execution_count": 13, 457 "metadata": {}, 458 "outputs": [ 459 { 460 "output_type": "stream", 461 "name": "stdout", 462 "text": [ 463 "Iterate over DataFrame batches\n[(1111000, 1111000),(1112000, 1112000),(1113000, 1113000),(1114000, 1111000),(1115000, 1112000)]\n[(1116000, 1113000),(1117000, 1111000),(1118000, 1112000),(1119000, 1113000),(1120000, 1111000)]\nIterate over regular batches\n[(1111000, 0),(1112000, 1),(1113000, 2),(1114000, 0),(1115000, 1)]\n[(1116000, 2),(1117000, 0),(1118000, 1),(1119000, 2),(1120000, 0)]\n" 464 ] 465 } 466 ], 467 "source": [ 468 "dp = get_dataframes_pipe(range = 10, dataframe_size = 3)\n", 469 "dp = dp.map(lambda x: x + 1111)\n", 470 "dp = dp.batch(5).map(lambda x: x * 1000, nesting_level = 1)\n", 471 "print(\"Iterate over DataFrame batches\")\n", 472 "for i in dp:\n", 473 " print(i)\n", 474 "\n", 475 "# Similarly works on row level for classic DataPipe elements\n", 476 "dp = get_regular_pipe(range = 10)\n", 477 "dp = dp.map(lambda x: (x[0] + 1111, x[1]))\n", 478 "dp = dp.batch(5).map(lambda x: (x[0] * 1000, x[1]), nesting_level = 1)\n", 479 "print(\"Iterate over regular batches\")\n", 480 "for i in dp:\n", 481 " print(i)\n", 482 "\n" 483 ] 484 }, 485 { 486 "source": [ 487 "`filter` applied to individual rows, `nesting_level` argument used to penetrate batching" 488 ], 489 "cell_type": "markdown", 490 "metadata": {} 491 }, 492 { 493 "cell_type": "code", 494 "execution_count": 14, 495 "metadata": {}, 496 "outputs": [ 497 { 498 "output_type": "stream", 499 "name": "stdout", 500 "text": [ 501 "Iterate over DataFrame batches\n[(6, 0),(7, 1),(8, 2),(9, 0),(10, 1)]\n[(11, 2),(12, 0)]\nIterate over regular batches\n[(6, 0),(7, 1),(8, 2),(9, 0),(10, 1)]\n[(11, 2),(12, 0)]\n" 502 ] 503 } 504 ], 505 "source": [ 506 "dp = get_dataframes_pipe(range = 30, dataframe_size = 3)\n", 507 "dp = dp.filter(lambda x: x.i > 5)\n", 508 "dp = dp.batch(5).filter(lambda x: x.i < 13, nesting_level = 1)\n", 509 "print(\"Iterate over DataFrame batches\")\n", 510 "for i in dp:\n", 511 " print(i)\n", 512 "\n", 513 "# Similarly works on row level for classic DataPipe elements\n", 514 "dp = get_regular_pipe(range = 30)\n", 515 "dp = dp.filter(lambda x: x[0] > 5)\n", 516 "dp = dp.batch(5).filter(lambda x: x[0] < 13, nesting_level = 1)\n", 517 "print(\"Iterate over regular batches\")\n", 518 "for i in dp:\n", 519 " print(i)" 520 ] 521 } 522 ] 523}