xref: /aosp_15_r20/external/pytorch/torch/utils/data/dataframes_pipes.ipynb (revision da0073e96a02ea20f0ac840b70461e3646d07c45)
1{
2 "metadata": {
3  "language_info": {
4   "codemirror_mode": {
5    "name": "ipython",
6    "version": 3
7   },
8   "file_extension": ".py",
9   "mimetype": "text/x-python",
10   "name": "python",
11   "nbconvert_exporter": "python",
12   "pygments_lexer": "ipython3",
13   "version": "3.6.10"
14  },
15  "orig_nbformat": 2,
16  "kernelspec": {
17   "name": "python3610jvsc74a57bd0eb5e09632d6ea1cbf3eb9da7e37b7cf581db5ed13074b21cc44e159dc62acdab",
18   "display_name": "Python 3.6.10 64-bit ('dataloader': conda)"
19  }
20 },
21 "nbformat": 4,
22 "nbformat_minor": 2,
23 "cells": [
24  {
25   "source": [
26    "## \\[RFC\\] How DataFrames (DF) and DataPipes (DP) work together"
27   ],
28   "cell_type": "markdown",
29   "metadata": {}
30  },
31  {
32   "cell_type": "code",
33   "execution_count": 1,
34   "metadata": {},
35   "outputs": [],
36   "source": [
37    "from importlib import reload\n",
38    "import torch\n",
39    "reload(torch)\n",
40    "from torch.utils.data import IterDataPipe"
41   ]
42  },
43  {
44   "cell_type": "code",
45   "execution_count": 2,
46   "metadata": {},
47   "outputs": [],
48   "source": [
49    "# Example IterDataPipe\n",
50    "class ExampleIterPipe(IterDataPipe):\n",
51    "    def __init__(self, range = 20):\n",
52    "        self.range = range\n",
53    "    def __iter__(self):\n",
54    "        for i in range(self.range):\n",
55    "            yield i\n",
56    "\n",
57    "def get_dataframes_pipe(range = 10, dataframe_size = 7):\n",
58    "    return ExampleIterPipe(range = range).map(lambda i: (i, i % 3))._to_dataframes_pipe(columns = ['i','j'], dataframe_size = dataframe_size)\n",
59    "\n",
60    "def get_regular_pipe(range = 10):\n",
61    "    return ExampleIterPipe(range = range).map(lambda i: (i, i % 3))\n"
62   ]
63  },
64  {
65   "source": [
66    "Doesn't matter how DF composed internally, iterator over DF Pipe gives single rows to user. This is similar to regular DataPipe."
67   ],
68   "cell_type": "markdown",
69   "metadata": {}
70  },
71  {
72   "cell_type": "code",
73   "execution_count": 3,
74   "metadata": {},
75   "outputs": [
76    {
77     "output_type": "stream",
78     "name": "stdout",
79     "text": [
80      "DataFrames Pipe\n(0, 0)\n(1, 1)\n(2, 2)\n(3, 0)\n(4, 1)\n(5, 2)\n(6, 0)\n(7, 1)\n(8, 2)\n(9, 0)\nRegular DataPipe\n(0, 0)\n(1, 1)\n(2, 2)\n(3, 0)\n(4, 1)\n(5, 2)\n(6, 0)\n(7, 1)\n(8, 2)\n(9, 0)\n"
81     ]
82    }
83   ],
84   "source": [
85    "print('DataFrames Pipe')\n",
86    "dp = get_dataframes_pipe()\n",
87    "for i in dp:\n",
88    "    print(i)\n",
89    "\n",
90    "print('Regular DataPipe')\n",
91    "dp = get_regular_pipe()\n",
92    "for i in dp:\n",
93    "    print(i)"
94   ]
95  },
96  {
97   "source": [
98    "You can iterate over raw DF using `raw_iterator`"
99   ],
100   "cell_type": "markdown",
101   "metadata": {}
102  },
103  {
104   "cell_type": "code",
105   "execution_count": 4,
106   "metadata": {},
107   "outputs": [
108    {
109     "output_type": "stream",
110     "name": "stdout",
111     "text": [
112      "   i  j\n0  0  0\n1  1  1\n2  2  2\n3  3  0\n4  4  1\n5  5  2\n6  6  0\n   i  j\n0  7  1\n1  8  2\n2  9  0\n"
113     ]
114    }
115   ],
116   "source": [
117    "dp = get_dataframes_pipe()\n",
118    "for i in dp.raw_iterator():\n",
119    "    print(i)"
120   ]
121  },
122  {
123   "source": [
124    "Operations over DF Pipe is captured"
125   ],
126   "cell_type": "markdown",
127   "metadata": {}
128  },
129  {
130   "cell_type": "code",
131   "execution_count": 5,
132   "metadata": {
133    "tags": []
134   },
135   "outputs": [
136    {
137     "output_type": "stream",
138     "name": "stdout",
139     "text": [
140      "var_3 = input_var_2.i * 100\nvar_4 = var_3 + input_var_2.j\nvar_5 = var_4 - 2.7\ninput_var_2[\"y\"] = var_5\n"
141     ]
142    }
143   ],
144   "source": [
145    "dp = get_dataframes_pipe(dataframe_size = 3)\n",
146    "dp['y'] = dp.i * 100 + dp.j - 2.7\n",
147    "print(dp.ops_str())\n"
148   ]
149  },
150  {
151   "source": [
152    "Captured operations executed on `__next__` calls of constructed DataPipe"
153   ],
154   "cell_type": "markdown",
155   "metadata": {}
156  },
157  {
158   "cell_type": "code",
159   "execution_count": 6,
160   "metadata": {},
161   "outputs": [
162    {
163     "output_type": "stream",
164     "name": "stdout",
165     "text": [
166      "   i  j      y\n0  0  0   -2.7\n1  1  1   98.3\n2  2  2  199.3\n   i  j      y\n0  3  0  297.3\n1  4  1  398.3\n2  5  2  499.3\n   i  j      y\n0  6  0  597.3\n1  7  1  698.3\n2  8  2  799.3\n   i  j      y\n0  9  0  897.3\n"
167     ]
168    }
169   ],
170   "source": [
171    "dp = get_dataframes_pipe(dataframe_size = 3)\n",
172    "dp['y'] = dp.i * 100 + dp.j - 2.7\n",
173    "for i in dp.raw_iterator():\n",
174    "    print(i)"
175   ]
176  },
177  {
178   "source": [
179    "`shuffle` of DataFramePipe effects rows in individual manner"
180   ],
181   "cell_type": "markdown",
182   "metadata": {}
183  },
184  {
185   "cell_type": "code",
186   "execution_count": 7,
187   "metadata": {},
188   "outputs": [
189    {
190     "output_type": "stream",
191     "name": "stdout",
192     "text": [
193      "Raw DataFrames iterator\n   i  j\n2  8  2\n2  2  2\n2  5  2\n   i  j\n1  4  1\n1  1  1\n0  3  0\n   i  j\n1  7  1\n0  9  0\n0  6  0\n   i  j\n0  0  0\nRegular DataFrames iterator\n(1, 1)\n(5, 2)\n(8, 2)\n(9, 0)\n(7, 1)\n(6, 0)\n(3, 0)\n(4, 1)\n(0, 0)\n(2, 2)\nRegular iterator\n(5, 2)\n(6, 0)\n(0, 0)\n(9, 0)\n(3, 0)\n(1, 1)\n(2, 2)\n(8, 2)\n(4, 1)\n(7, 1)\n"
194     ]
195    }
196   ],
197   "source": [
198    "dp = get_dataframes_pipe(dataframe_size = 3)\n",
199    "dp = dp.shuffle()\n",
200    "print('Raw DataFrames iterator')\n",
201    "for i in dp.raw_iterator():\n",
202    "    print(i)\n",
203    "\n",
204    "print('Regular DataFrames iterator')\n",
205    "for i in dp:\n",
206    "    print(i)\n",
207    "\n",
208    "\n",
209    "# this is similar to shuffle of regular DataPipe\n",
210    "dp = get_regular_pipe()\n",
211    "dp = dp.shuffle()\n",
212    "print('Regular iterator')\n",
213    "for i in dp:\n",
214    "    print(i)"
215   ]
216  },
217  {
218   "source": [
219    "You can continue mixing DF and DP operations"
220   ],
221   "cell_type": "markdown",
222   "metadata": {}
223  },
224  {
225   "cell_type": "code",
226   "execution_count": 8,
227   "metadata": {},
228   "outputs": [
229    {
230     "output_type": "stream",
231     "name": "stdout",
232     "text": [
233      "    i   j          y\n0 -17 -17  -197000.0\n1 -13 -16  3813000.0\n0 -11 -17  5803000.0\n    i   j          y\n2 -12 -15  4823000.0\n1 -10 -16  6813000.0\n1 -16 -16   813000.0\n    i   j          y\n0  -8 -17  8803000.0\n2  -9 -15  7823000.0\n0 -14 -17  2803000.0\n    i   j          y\n2 -15 -15  1823000.0\n"
234     ]
235    }
236   ],
237   "source": [
238    "dp = get_dataframes_pipe(dataframe_size = 3)\n",
239    "dp['y'] = dp.i * 100 + dp.j - 2.7\n",
240    "dp = dp.shuffle()\n",
241    "dp = dp - 17\n",
242    "dp['y'] = dp.y * 10000\n",
243    "for i in dp.raw_iterator():\n",
244    "    print(i)"
245   ]
246  },
247  {
248   "source": [
249    "Batching combines everything into `list` it is possible to nest `list`s. List may have any number of DataFrames as soon as total number of rows equal to batch size."
250   ],
251   "cell_type": "markdown",
252   "metadata": {}
253  },
254  {
255   "cell_type": "code",
256   "execution_count": 9,
257   "metadata": {},
258   "outputs": [
259    {
260     "output_type": "stream",
261     "name": "stdout",
262     "text": [
263      "Iterate over DataFrame batches\n[(6, 0),(0, 0)]\n[(4, 1),(1, 1)]\n[(2, 2),(9, 0)]\n[(3, 0),(5, 2)]\n[(7, 1),(8, 2)]\nIterate over regular batches\n[(1, 1),(4, 1)]\n[(2, 2),(3, 0)]\n[(6, 0),(7, 1)]\n[(8, 2),(0, 0)]\n[(5, 2),(9, 0)]\n"
264     ]
265    }
266   ],
267   "source": [
268    "dp = get_dataframes_pipe(dataframe_size = 3)\n",
269    "dp = dp.shuffle()\n",
270    "dp = dp.batch(2)\n",
271    "print(\"Iterate over DataFrame batches\")\n",
272    "for i,v in enumerate(dp):\n",
273    "    print(v)\n",
274    "\n",
275    "# this is similar to batching of regular DataPipe\n",
276    "dp = get_regular_pipe()\n",
277    "dp = dp.shuffle()\n",
278    "dp = dp.batch(2)\n",
279    "print(\"Iterate over regular batches\")\n",
280    "for i in dp:\n",
281    "    print(i)"
282   ]
283  },
284  {
285   "source": [
286    "Some details about internal storage of batched DataFrames and how they are iterated"
287   ],
288   "cell_type": "markdown",
289   "metadata": {}
290  },
291  {
292   "cell_type": "code",
293   "execution_count": 10,
294   "metadata": {},
295   "outputs": [
296    {
297     "output_type": "stream",
298     "name": "stdout",
299     "text": [
300      "Type:  <class 'torch.utils.data.datapipes.iter.dataframes.DataChunkDF'>\n",
301      "As string:  [(0, 0),(3, 0)]\n",
302      "Iterated regularly:\n",
303      "-- batch start --\n",
304      "(0, 0)\n",
305      "(3, 0)\n",
306      "-- batch end --\n",
307      "Iterated in inner format (for developers):\n",
308      "-- df batch start --\n",
309      "   i  j\n",
310      "0  0  0\n",
311      "0  3  0\n",
312      "-- df batch end --\n",
313      "Type:  <class 'torch.utils.data.datapipes.iter.dataframes.DataChunkDF'>\n",
314      "As string:  [(6, 0),(1, 1)]\n",
315      "Iterated regularly:\n",
316      "-- batch start --\n",
317      "(6, 0)\n",
318      "(1, 1)\n",
319      "-- batch end --\n",
320      "Iterated in inner format (for developers):\n",
321      "-- df batch start --\n",
322      "   i  j\n",
323      "0  6  0\n",
324      "1  1  1\n",
325      "-- df batch end --\n",
326      "Type:  <class 'torch.utils.data.datapipes.iter.dataframes.DataChunkDF'>\n",
327      "As string:  [(9, 0),(4, 1)]\n",
328      "Iterated regularly:\n",
329      "-- batch start --\n",
330      "(9, 0)\n",
331      "(4, 1)\n",
332      "-- batch end --\n",
333      "Iterated in inner format (for developers):\n",
334      "-- df batch start --\n",
335      "   i  j\n",
336      "0  9  0\n",
337      "1  4  1\n",
338      "-- df batch end --\n",
339      "Type:  <class 'torch.utils.data.datapipes.iter.dataframes.DataChunkDF'>\n",
340      "As string:  [(5, 2),(2, 2)]\n",
341      "Iterated regularly:\n",
342      "-- batch start --\n",
343      "(5, 2)\n",
344      "(2, 2)\n",
345      "-- batch end --\n",
346      "Iterated in inner format (for developers):\n",
347      "-- df batch start --\n",
348      "   i  j\n",
349      "2  5  2\n",
350      "2  2  2\n",
351      "-- df batch end --\n",
352      "Type:  <class 'torch.utils.data.datapipes.iter.dataframes.DataChunkDF'>\n",
353      "As string:  [(8, 2),(7, 1)]\n",
354      "Iterated regularly:\n",
355      "-- batch start --\n",
356      "(8, 2)\n",
357      "(7, 1)\n",
358      "-- batch end --\n",
359      "Iterated in inner format (for developers):\n",
360      "-- df batch start --\n",
361      "   i  j\n",
362      "2  8  2\n",
363      "1  7  1\n",
364      "-- df batch end --\n"
365     ]
366    }
367   ],
368   "source": [
369    "dp = get_dataframes_pipe(dataframe_size = 3)\n",
370    "dp = dp.shuffle()\n",
371    "dp = dp.batch(2)\n",
372    "for i in dp:\n",
373    "    print(\"Type: \", type(i))\n",
374    "    print(\"As string: \", i)\n",
375    "    print(\"Iterated regularly:\")\n",
376    "    print('-- batch start --')\n",
377    "    for item in i:\n",
378    "        print(item)\n",
379    "    print('-- batch end --')\n",
380    "    print(\"Iterated in inner format (for developers):\")\n",
381    "    print('-- df batch start --')\n",
382    "    for item in i.raw_iterator():\n",
383    "        print(item)\n",
384    "    print('-- df batch end --')   "
385   ]
386  },
387  {
388   "source": [
389    "`concat` should work only of DF with same schema, this code should produce an error "
390   ],
391   "cell_type": "markdown",
392   "metadata": {}
393  },
394  {
395   "cell_type": "code",
396   "execution_count": 11,
397   "metadata": {},
398   "outputs": [],
399   "source": [
400    "# TODO!\n",
401    "# dp0 = get_dataframes_pipe(range = 8, dataframe_size = 4)\n",
402    "# dp = get_dataframes_pipe(range = 6, dataframe_size = 3)\n",
403    "# dp['y'] = dp.i * 100 + dp.j - 2.7\n",
404    "# dp = dp.concat(dp0)\n",
405    "# for i,v in enumerate(dp.raw_iterator()):\n",
406    "#     print(v)"
407   ]
408  },
409  {
410   "source": [
411    "`unbatch` of `list` with DataFrame works similarly to regular unbatch.\n",
412    "Note: DataFrame sizes might change"
413   ],
414   "cell_type": "markdown",
415   "metadata": {}
416  },
417  {
418   "cell_type": "code",
419   "execution_count": 12,
420   "metadata": {},
421   "outputs": [
422    {
423     "output_type": "error",
424     "ename": "AttributeError",
425     "evalue": "",
426     "traceback": [
427      "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m",
428      "\u001b[0;31mAttributeError\u001b[0m                            Traceback (most recent call last)",
429      "\u001b[0;32m<ipython-input-12-fa80e9c68655>\u001b[0m in \u001b[0;36m<module>\u001b[0;34m\u001b[0m\n\u001b[1;32m      4\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m      5\u001b[0m \u001b[0;31m# Here is bug with unbatching which doesn't detect DF type.\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m----> 6\u001b[0;31m \u001b[0mdp\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0;34m'z'\u001b[0m\u001b[0;34m]\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mdp\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0my\u001b[0m \u001b[0;34m-\u001b[0m \u001b[0;36m100\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m      7\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m      8\u001b[0m \u001b[0;32mfor\u001b[0m \u001b[0mi\u001b[0m \u001b[0;32min\u001b[0m \u001b[0mdp\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mraw_iterator\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
430      "\u001b[0;32m~/dataset/pytorch/torch/utils/data/dataset.py\u001b[0m in \u001b[0;36m__getattr__\u001b[0;34m(self, attribute_name)\u001b[0m\n\u001b[1;32m    222\u001b[0m             \u001b[0;32mreturn\u001b[0m \u001b[0mfunction\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m    223\u001b[0m         \u001b[0;32melse\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 224\u001b[0;31m             \u001b[0;32mraise\u001b[0m \u001b[0mAttributeError\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m    225\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m    226\u001b[0m     \u001b[0;32mdef\u001b[0m \u001b[0m__reduce_ex__\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m*\u001b[0m\u001b[0margs\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m**\u001b[0m\u001b[0mkwargs\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
431      "\u001b[0;31mAttributeError\u001b[0m: "
432     ]
433    }
434   ],
435   "source": [
436    "dp = get_dataframes_pipe(range = 18, dataframe_size = 3)\n",
437    "dp['y'] = dp.i * 100 + dp.j - 2.7\n",
438    "dp = dp.batch(5).batch(3).batch(1).unbatch(unbatch_level = 3)\n",
439    "\n",
440    "# Here is bug with unbatching which doesn't detect DF type.\n",
441    "dp['z'] = dp.y - 100\n",
442    "\n",
443    "for i in dp.raw_iterator():\n",
444    "    print(i)"
445   ]
446  },
447  {
448   "source": [
449    "`map` applied to individual rows, `nesting_level` argument used to penetrate batching"
450   ],
451   "cell_type": "markdown",
452   "metadata": {}
453  },
454  {
455   "cell_type": "code",
456   "execution_count": 13,
457   "metadata": {},
458   "outputs": [
459    {
460     "output_type": "stream",
461     "name": "stdout",
462     "text": [
463      "Iterate over DataFrame batches\n[(1111000, 1111000),(1112000, 1112000),(1113000, 1113000),(1114000, 1111000),(1115000, 1112000)]\n[(1116000, 1113000),(1117000, 1111000),(1118000, 1112000),(1119000, 1113000),(1120000, 1111000)]\nIterate over regular batches\n[(1111000, 0),(1112000, 1),(1113000, 2),(1114000, 0),(1115000, 1)]\n[(1116000, 2),(1117000, 0),(1118000, 1),(1119000, 2),(1120000, 0)]\n"
464     ]
465    }
466   ],
467   "source": [
468    "dp = get_dataframes_pipe(range = 10, dataframe_size = 3)\n",
469    "dp = dp.map(lambda x: x + 1111)\n",
470    "dp = dp.batch(5).map(lambda x: x * 1000, nesting_level = 1)\n",
471    "print(\"Iterate over DataFrame batches\")\n",
472    "for i in dp:\n",
473    "    print(i)\n",
474    "\n",
475    "# Similarly works on row level for classic DataPipe elements\n",
476    "dp = get_regular_pipe(range = 10)\n",
477    "dp = dp.map(lambda x: (x[0] + 1111, x[1]))\n",
478    "dp = dp.batch(5).map(lambda x: (x[0] * 1000, x[1]), nesting_level = 1)\n",
479    "print(\"Iterate over regular batches\")\n",
480    "for i in dp:\n",
481    "    print(i)\n",
482    "\n"
483   ]
484  },
485  {
486   "source": [
487    "`filter` applied to individual rows, `nesting_level` argument used to penetrate batching"
488   ],
489   "cell_type": "markdown",
490   "metadata": {}
491  },
492  {
493   "cell_type": "code",
494   "execution_count": 14,
495   "metadata": {},
496   "outputs": [
497    {
498     "output_type": "stream",
499     "name": "stdout",
500     "text": [
501      "Iterate over DataFrame batches\n[(6, 0),(7, 1),(8, 2),(9, 0),(10, 1)]\n[(11, 2),(12, 0)]\nIterate over regular batches\n[(6, 0),(7, 1),(8, 2),(9, 0),(10, 1)]\n[(11, 2),(12, 0)]\n"
502     ]
503    }
504   ],
505   "source": [
506    "dp = get_dataframes_pipe(range = 30, dataframe_size = 3)\n",
507    "dp = dp.filter(lambda x: x.i > 5)\n",
508    "dp = dp.batch(5).filter(lambda x: x.i < 13, nesting_level = 1)\n",
509    "print(\"Iterate over DataFrame batches\")\n",
510    "for i in dp:\n",
511    "    print(i)\n",
512    "\n",
513    "# Similarly works on row level for classic DataPipe elements\n",
514    "dp = get_regular_pipe(range = 30)\n",
515    "dp = dp.filter(lambda x: x[0] > 5)\n",
516    "dp = dp.batch(5).filter(lambda x: x[0] < 13, nesting_level = 1)\n",
517    "print(\"Iterate over regular batches\")\n",
518    "for i in dp:\n",
519    "    print(i)"
520   ]
521  }
522 ]
523}