xref: /aosp_15_r20/external/pytorch/test/cpp/rpc/test_tensorpipe_serialization.cpp (revision da0073e96a02ea20f0ac840b70461e3646d07c45)
1 #include <gtest/gtest.h>
2 
3 #include <c10/util/irange.h>
4 #include <tensorpipe/common/cpu_buffer.h>
5 #include <tensorpipe/core/message.h>
6 #include <torch/csrc/distributed/rpc/tensorpipe_utils.h>
7 #include <torch/torch.h>
8 
9 #include <memory>
10 #include <string>
11 #include <vector>
12 
TEST(TensorpipeSerialize,Base)13 TEST(TensorpipeSerialize, Base) {
14   // Sender serializes
15   at::Tensor t1 = torch::ones({1024}, at::ScalarType::Int);
16   at::Tensor t2 = torch::ones({1024}, at::ScalarType::Float);
17   std::vector<at::Tensor> tensors{t1, t2};
18   std::vector<char> payload = {'1', '2', '3'};
19   std::vector<char> payloadCopy = payload; // for testing
20   torch::distributed::rpc::MessageType mtype =
21       torch::distributed::rpc::MessageType::UNKNOWN;
22   int64_t mId = 100;
23   auto sendingRpcMessage =
24       c10::make_intrusive<torch::distributed::rpc::Message>(
25           std::move(payload), std::move(tensors), mtype);
26   sendingRpcMessage->setId(mId);
27   auto [sendingTpMessage, sendingTpBuffers] =
28       torch::distributed::rpc::tensorpipeSerialize(
29           std::move(sendingRpcMessage), {}, {});
30 
31   // Mimic receiving message descriptor: recvingTpDescriptor is a copy of
32   // sendingTpMessage except for the data pointers which are left null.
33   tensorpipe::Descriptor recvingTpDescriptor;
34   recvingTpDescriptor.metadata = sendingTpMessage.metadata;
35   recvingTpDescriptor.payloads.reserve(sendingTpMessage.payloads.size());
36   for (auto& tpPayload : sendingTpMessage.payloads) {
37     tensorpipe::Descriptor::Payload p;
38     p.length = tpPayload.length;
39     p.metadata = tpPayload.metadata;
40     recvingTpDescriptor.payloads.push_back(std::move(p));
41   }
42   EXPECT_EQ(
43       recvingTpDescriptor.payloads.size(), sendingTpMessage.payloads.size());
44   recvingTpDescriptor.tensors.reserve(sendingTpMessage.tensors.size());
45   for (auto& tpTensor : sendingTpMessage.tensors) {
46     tensorpipe::Descriptor::Tensor t;
47     t.length = tpTensor.length;
48     t.sourceDevice = tpTensor.buffer.device();
49     t.targetDevice = tpTensor.targetDevice;
50     t.metadata = tpTensor.metadata;
51     recvingTpDescriptor.tensors.push_back(std::move(t));
52   }
53   EXPECT_EQ(
54       recvingTpDescriptor.tensors.size(), sendingTpMessage.tensors.size());
55 
56   // Mimic readDescriptor() callback:
57   // - Allocate buffers
58   // - Fill pointers in tensorpipe message
59   auto [recvingTpAllocation, recvingTpBuffers] =
60       torch::distributed::rpc::tensorpipeAllocate(recvingTpDescriptor, {});
61 
62   // Mimic tensorpipe data transfer
63   EXPECT_EQ(
64       recvingTpAllocation.payloads.size(), sendingTpMessage.payloads.size());
65   for (const auto i : c10::irange(recvingTpAllocation.payloads.size())) {
66     tensorpipe::Message::Payload& srcPayload = sendingTpMessage.payloads[i];
67     tensorpipe::Allocation::Payload& dstPayload =
68         recvingTpAllocation.payloads[i];
69     if (srcPayload.length) {
70       // Empty vector's data() can return nullptr, use the length to avoid
71       // copying into nullptr
72       memcpy(dstPayload.data, srcPayload.data, srcPayload.length);
73     }
74   }
75   EXPECT_EQ(
76       recvingTpAllocation.tensors.size(), sendingTpMessage.tensors.size());
77   for (const auto i : c10::irange(recvingTpAllocation.tensors.size())) {
78     tensorpipe::Message::Tensor& srcTensor = sendingTpMessage.tensors[i];
79     tensorpipe::Allocation::Tensor& dstTensor = recvingTpAllocation.tensors[i];
80     memcpy(
81         dstTensor.buffer.unwrap<tensorpipe::CpuBuffer>().ptr,
82         srcTensor.buffer.unwrap<tensorpipe::CpuBuffer>().ptr,
83         srcTensor.length);
84   }
85 
86   // Mimic read() callback:
87   // - Unpickle
88   c10::intrusive_ptr<torch::distributed::rpc::Message> recvingRpcMessage =
89       torch::distributed::rpc::tensorpipeDeserialize(
90           std::move(recvingTpDescriptor), std::move(recvingTpBuffers));
91 
92   // Data is ready
93   EXPECT_EQ(mtype, recvingRpcMessage->type());
94   EXPECT_EQ(payloadCopy, recvingRpcMessage->payload());
95   EXPECT_EQ(mId, recvingRpcMessage->id());
96   EXPECT_TRUE(torch::equal(t1, recvingRpcMessage->tensors()[0]));
97   EXPECT_TRUE(torch::equal(t2, recvingRpcMessage->tensors()[1]));
98 }
99 
TEST(TensorpipeSerialize,RecopySparseTensors)100 TEST(TensorpipeSerialize, RecopySparseTensors) {
101   // Take a 1K row of a 1M tensors, and make sure we don't send across 1M rows.
102   constexpr size_t k1K = 1024;
103   at::Tensor main = torch::randn({k1K, k1K});
104   at::Tensor tiny = main.select(0, 2); // Select a row in the middle
105   EXPECT_EQ(tiny.numel(), k1K);
106   EXPECT_EQ(tiny.storage().nbytes() / tiny.itemsize(), k1K * k1K);
107 
108   std::vector<at::Tensor> tensors{main, tiny};
109   std::vector<char> payload = {'1', '2', '3'};
110   torch::distributed::rpc::MessageType mtype =
111       torch::distributed::rpc::MessageType::UNKNOWN;
112   auto sendingRpcMessage =
113       c10::make_intrusive<torch::distributed::rpc::Message>(
114           std::move(payload), std::move(tensors), mtype);
115 
116   auto [sendingTpMessage, tpBuffers] =
117       torch::distributed::rpc::tensorpipeSerialize(
118           std::move(sendingRpcMessage), {}, {});
119 
120   EXPECT_EQ(tpBuffers.tensors.size(), 2);
121   EXPECT_EQ(sendingTpMessage.tensors.size(), 2);
122   EXPECT_TRUE(torch::equal(main, tpBuffers.tensors[0]));
123   EXPECT_TRUE(torch::equal(tiny, tpBuffers.tensors[1]));
124   // Test cloned storage
125   EXPECT_EQ(
126       main.storage().data(),
127       sendingTpMessage.tensors[0].buffer.unwrap<tensorpipe::CpuBuffer>().ptr);
128   EXPECT_NE(
129       tiny.storage().data(),
130       sendingTpMessage.tensors[1].buffer.unwrap<tensorpipe::CpuBuffer>().ptr);
131   EXPECT_EQ(tiny.element_size() * k1K, sendingTpMessage.tensors[1].length);
132 }
133 
TEST(TensorpipeSerialize,NoDeleterTensors)134 TEST(TensorpipeSerialize, NoDeleterTensors) {
135   std::vector<float> blob1{.8, .2};
136   std::vector<float> blob2{.7, .5, .9};
137   at::Tensor t1 = torch::from_blob((float*)(blob1.data()), blob1.size());
138   at::Tensor t2 = torch::from_blob((float*)(blob2.data()), blob2.size());
139   std::vector<at::Tensor> tensors{t1, t2};
140   std::vector<char> payload = {'1', '2', '3'};
141   torch::distributed::rpc::MessageType mtype =
142       torch::distributed::rpc::MessageType::UNKNOWN;
143   auto sendingRpcMessage =
144       c10::make_intrusive<torch::distributed::rpc::Message>(
145           std::move(payload), std::move(tensors), mtype);
146 
147   auto [sendingTpMessage, tpBuffers] =
148       torch::distributed::rpc::tensorpipeSerialize(
149           std::move(sendingRpcMessage), {}, {});
150 
151   EXPECT_EQ(tpBuffers.copiedTensors.size(), 2);
152   EXPECT_EQ(sendingTpMessage.tensors.size(), 2);
153   EXPECT_EQ(
154       tpBuffers.copiedTensors[0].size(), sendingTpMessage.tensors[0].length);
155   EXPECT_EQ(
156       tpBuffers.copiedTensors[1].size(), sendingTpMessage.tensors[1].length);
157   EXPECT_EQ(
158       tpBuffers.copiedTensors[0].data(),
159       sendingTpMessage.tensors[0].buffer.unwrap<tensorpipe::CpuBuffer>().ptr);
160   EXPECT_EQ(
161       tpBuffers.copiedTensors[1].data(),
162       sendingTpMessage.tensors[1].buffer.unwrap<tensorpipe::CpuBuffer>().ptr);
163   EXPECT_TRUE(
164       memcmp(
165           tpBuffers.copiedTensors[0].data(),
166           t1.storage().data(),
167           sendingTpMessage.tensors[0].length) == 0);
168   EXPECT_TRUE(
169       memcmp(
170           tpBuffers.copiedTensors[1].data(),
171           t2.storage().data(),
172           sendingTpMessage.tensors[1].length) == 0);
173 }
174