1 #include <gtest/gtest.h>
2
3 #include <c10/util/irange.h>
4 #include <tensorpipe/common/cpu_buffer.h>
5 #include <tensorpipe/core/message.h>
6 #include <torch/csrc/distributed/rpc/tensorpipe_utils.h>
7 #include <torch/torch.h>
8
9 #include <memory>
10 #include <string>
11 #include <vector>
12
TEST(TensorpipeSerialize,Base)13 TEST(TensorpipeSerialize, Base) {
14 // Sender serializes
15 at::Tensor t1 = torch::ones({1024}, at::ScalarType::Int);
16 at::Tensor t2 = torch::ones({1024}, at::ScalarType::Float);
17 std::vector<at::Tensor> tensors{t1, t2};
18 std::vector<char> payload = {'1', '2', '3'};
19 std::vector<char> payloadCopy = payload; // for testing
20 torch::distributed::rpc::MessageType mtype =
21 torch::distributed::rpc::MessageType::UNKNOWN;
22 int64_t mId = 100;
23 auto sendingRpcMessage =
24 c10::make_intrusive<torch::distributed::rpc::Message>(
25 std::move(payload), std::move(tensors), mtype);
26 sendingRpcMessage->setId(mId);
27 auto [sendingTpMessage, sendingTpBuffers] =
28 torch::distributed::rpc::tensorpipeSerialize(
29 std::move(sendingRpcMessage), {}, {});
30
31 // Mimic receiving message descriptor: recvingTpDescriptor is a copy of
32 // sendingTpMessage except for the data pointers which are left null.
33 tensorpipe::Descriptor recvingTpDescriptor;
34 recvingTpDescriptor.metadata = sendingTpMessage.metadata;
35 recvingTpDescriptor.payloads.reserve(sendingTpMessage.payloads.size());
36 for (auto& tpPayload : sendingTpMessage.payloads) {
37 tensorpipe::Descriptor::Payload p;
38 p.length = tpPayload.length;
39 p.metadata = tpPayload.metadata;
40 recvingTpDescriptor.payloads.push_back(std::move(p));
41 }
42 EXPECT_EQ(
43 recvingTpDescriptor.payloads.size(), sendingTpMessage.payloads.size());
44 recvingTpDescriptor.tensors.reserve(sendingTpMessage.tensors.size());
45 for (auto& tpTensor : sendingTpMessage.tensors) {
46 tensorpipe::Descriptor::Tensor t;
47 t.length = tpTensor.length;
48 t.sourceDevice = tpTensor.buffer.device();
49 t.targetDevice = tpTensor.targetDevice;
50 t.metadata = tpTensor.metadata;
51 recvingTpDescriptor.tensors.push_back(std::move(t));
52 }
53 EXPECT_EQ(
54 recvingTpDescriptor.tensors.size(), sendingTpMessage.tensors.size());
55
56 // Mimic readDescriptor() callback:
57 // - Allocate buffers
58 // - Fill pointers in tensorpipe message
59 auto [recvingTpAllocation, recvingTpBuffers] =
60 torch::distributed::rpc::tensorpipeAllocate(recvingTpDescriptor, {});
61
62 // Mimic tensorpipe data transfer
63 EXPECT_EQ(
64 recvingTpAllocation.payloads.size(), sendingTpMessage.payloads.size());
65 for (const auto i : c10::irange(recvingTpAllocation.payloads.size())) {
66 tensorpipe::Message::Payload& srcPayload = sendingTpMessage.payloads[i];
67 tensorpipe::Allocation::Payload& dstPayload =
68 recvingTpAllocation.payloads[i];
69 if (srcPayload.length) {
70 // Empty vector's data() can return nullptr, use the length to avoid
71 // copying into nullptr
72 memcpy(dstPayload.data, srcPayload.data, srcPayload.length);
73 }
74 }
75 EXPECT_EQ(
76 recvingTpAllocation.tensors.size(), sendingTpMessage.tensors.size());
77 for (const auto i : c10::irange(recvingTpAllocation.tensors.size())) {
78 tensorpipe::Message::Tensor& srcTensor = sendingTpMessage.tensors[i];
79 tensorpipe::Allocation::Tensor& dstTensor = recvingTpAllocation.tensors[i];
80 memcpy(
81 dstTensor.buffer.unwrap<tensorpipe::CpuBuffer>().ptr,
82 srcTensor.buffer.unwrap<tensorpipe::CpuBuffer>().ptr,
83 srcTensor.length);
84 }
85
86 // Mimic read() callback:
87 // - Unpickle
88 c10::intrusive_ptr<torch::distributed::rpc::Message> recvingRpcMessage =
89 torch::distributed::rpc::tensorpipeDeserialize(
90 std::move(recvingTpDescriptor), std::move(recvingTpBuffers));
91
92 // Data is ready
93 EXPECT_EQ(mtype, recvingRpcMessage->type());
94 EXPECT_EQ(payloadCopy, recvingRpcMessage->payload());
95 EXPECT_EQ(mId, recvingRpcMessage->id());
96 EXPECT_TRUE(torch::equal(t1, recvingRpcMessage->tensors()[0]));
97 EXPECT_TRUE(torch::equal(t2, recvingRpcMessage->tensors()[1]));
98 }
99
TEST(TensorpipeSerialize,RecopySparseTensors)100 TEST(TensorpipeSerialize, RecopySparseTensors) {
101 // Take a 1K row of a 1M tensors, and make sure we don't send across 1M rows.
102 constexpr size_t k1K = 1024;
103 at::Tensor main = torch::randn({k1K, k1K});
104 at::Tensor tiny = main.select(0, 2); // Select a row in the middle
105 EXPECT_EQ(tiny.numel(), k1K);
106 EXPECT_EQ(tiny.storage().nbytes() / tiny.itemsize(), k1K * k1K);
107
108 std::vector<at::Tensor> tensors{main, tiny};
109 std::vector<char> payload = {'1', '2', '3'};
110 torch::distributed::rpc::MessageType mtype =
111 torch::distributed::rpc::MessageType::UNKNOWN;
112 auto sendingRpcMessage =
113 c10::make_intrusive<torch::distributed::rpc::Message>(
114 std::move(payload), std::move(tensors), mtype);
115
116 auto [sendingTpMessage, tpBuffers] =
117 torch::distributed::rpc::tensorpipeSerialize(
118 std::move(sendingRpcMessage), {}, {});
119
120 EXPECT_EQ(tpBuffers.tensors.size(), 2);
121 EXPECT_EQ(sendingTpMessage.tensors.size(), 2);
122 EXPECT_TRUE(torch::equal(main, tpBuffers.tensors[0]));
123 EXPECT_TRUE(torch::equal(tiny, tpBuffers.tensors[1]));
124 // Test cloned storage
125 EXPECT_EQ(
126 main.storage().data(),
127 sendingTpMessage.tensors[0].buffer.unwrap<tensorpipe::CpuBuffer>().ptr);
128 EXPECT_NE(
129 tiny.storage().data(),
130 sendingTpMessage.tensors[1].buffer.unwrap<tensorpipe::CpuBuffer>().ptr);
131 EXPECT_EQ(tiny.element_size() * k1K, sendingTpMessage.tensors[1].length);
132 }
133
TEST(TensorpipeSerialize,NoDeleterTensors)134 TEST(TensorpipeSerialize, NoDeleterTensors) {
135 std::vector<float> blob1{.8, .2};
136 std::vector<float> blob2{.7, .5, .9};
137 at::Tensor t1 = torch::from_blob((float*)(blob1.data()), blob1.size());
138 at::Tensor t2 = torch::from_blob((float*)(blob2.data()), blob2.size());
139 std::vector<at::Tensor> tensors{t1, t2};
140 std::vector<char> payload = {'1', '2', '3'};
141 torch::distributed::rpc::MessageType mtype =
142 torch::distributed::rpc::MessageType::UNKNOWN;
143 auto sendingRpcMessage =
144 c10::make_intrusive<torch::distributed::rpc::Message>(
145 std::move(payload), std::move(tensors), mtype);
146
147 auto [sendingTpMessage, tpBuffers] =
148 torch::distributed::rpc::tensorpipeSerialize(
149 std::move(sendingRpcMessage), {}, {});
150
151 EXPECT_EQ(tpBuffers.copiedTensors.size(), 2);
152 EXPECT_EQ(sendingTpMessage.tensors.size(), 2);
153 EXPECT_EQ(
154 tpBuffers.copiedTensors[0].size(), sendingTpMessage.tensors[0].length);
155 EXPECT_EQ(
156 tpBuffers.copiedTensors[1].size(), sendingTpMessage.tensors[1].length);
157 EXPECT_EQ(
158 tpBuffers.copiedTensors[0].data(),
159 sendingTpMessage.tensors[0].buffer.unwrap<tensorpipe::CpuBuffer>().ptr);
160 EXPECT_EQ(
161 tpBuffers.copiedTensors[1].data(),
162 sendingTpMessage.tensors[1].buffer.unwrap<tensorpipe::CpuBuffer>().ptr);
163 EXPECT_TRUE(
164 memcmp(
165 tpBuffers.copiedTensors[0].data(),
166 t1.storage().data(),
167 sendingTpMessage.tensors[0].length) == 0);
168 EXPECT_TRUE(
169 memcmp(
170 tpBuffers.copiedTensors[1].data(),
171 t2.storage().data(),
172 sendingTpMessage.tensors[1].length) == 0);
173 }
174