1 /*
2 * Copyright (c) 2006-2018, RT-Thread Development Team
3 *
4 * SPDX-License-Identifier: Apache-2.0
5 *
6 * Change Logs:
7 * Date Author Notes
8 * 2012-09-30 Bernard first version.
9 */
10
11 #include <rthw.h>
12 #include <rtthread.h>
13 #include <rtdevice.h>
14
15 #include "audio_pipe.h"
16
_rt_pipe_resume_writer(struct rt_audio_pipe * pipe)17 static void _rt_pipe_resume_writer(struct rt_audio_pipe *pipe)
18 {
19 if (!rt_list_isempty(&pipe->suspended_write_list))
20 {
21 rt_thread_t thread;
22
23 RT_ASSERT(pipe->flag & RT_PIPE_FLAG_BLOCK_WR);
24
25 /* get suspended thread */
26 thread = rt_list_entry(pipe->suspended_write_list.next,
27 struct rt_thread,
28 tlist);
29
30 /* resume the write thread */
31 rt_thread_resume(thread);
32
33 rt_schedule();
34 }
35 }
36
rt_pipe_read(rt_device_t dev,rt_off_t pos,void * buffer,rt_size_t size)37 static rt_size_t rt_pipe_read(rt_device_t dev,
38 rt_off_t pos,
39 void *buffer,
40 rt_size_t size)
41 {
42 rt_uint32_t level;
43 rt_thread_t thread;
44 struct rt_audio_pipe *pipe;
45 rt_size_t read_nbytes;
46
47 pipe = (struct rt_audio_pipe *)dev;
48 RT_ASSERT(pipe != RT_NULL);
49
50 if (!(pipe->flag & RT_PIPE_FLAG_BLOCK_RD))
51 {
52 level = rt_hw_interrupt_disable();
53 read_nbytes = rt_ringbuffer_get(&(pipe->ringbuffer), buffer, size);
54
55 /* if the ringbuffer is empty, there won't be any writer waiting */
56 if (read_nbytes)
57 _rt_pipe_resume_writer(pipe);
58
59 rt_hw_interrupt_enable(level);
60
61 return read_nbytes;
62 }
63
64 thread = rt_thread_self();
65
66 /* current context checking */
67 RT_DEBUG_NOT_IN_INTERRUPT;
68
69 do {
70 level = rt_hw_interrupt_disable();
71 read_nbytes = rt_ringbuffer_get(&(pipe->ringbuffer), buffer, size);
72 if (read_nbytes == 0)
73 {
74 rt_thread_suspend(thread);
75 /* waiting on suspended read list */
76 rt_list_insert_before(&(pipe->suspended_read_list),
77 &(thread->tlist));
78 rt_hw_interrupt_enable(level);
79
80 rt_schedule();
81 }
82 else
83 {
84 _rt_pipe_resume_writer(pipe);
85 rt_hw_interrupt_enable(level);
86 break;
87 }
88 } while (read_nbytes == 0);
89
90 return read_nbytes;
91 }
92
_rt_pipe_resume_reader(struct rt_audio_pipe * pipe)93 static void _rt_pipe_resume_reader(struct rt_audio_pipe *pipe)
94 {
95 if (pipe->parent.rx_indicate)
96 pipe->parent.rx_indicate(&pipe->parent,
97 rt_ringbuffer_data_len(&pipe->ringbuffer));
98
99 if (!rt_list_isempty(&pipe->suspended_read_list))
100 {
101 rt_thread_t thread;
102
103 RT_ASSERT(pipe->flag & RT_PIPE_FLAG_BLOCK_RD);
104
105 /* get suspended thread */
106 thread = rt_list_entry(pipe->suspended_read_list.next,
107 struct rt_thread,
108 tlist);
109
110 /* resume the read thread */
111 rt_thread_resume(thread);
112
113 rt_schedule();
114 }
115 }
116
rt_pipe_write(rt_device_t dev,rt_off_t pos,const void * buffer,rt_size_t size)117 static rt_size_t rt_pipe_write(rt_device_t dev,
118 rt_off_t pos,
119 const void *buffer,
120 rt_size_t size)
121 {
122 rt_uint32_t level;
123 rt_thread_t thread;
124 struct rt_audio_pipe *pipe;
125 rt_size_t write_nbytes;
126
127 pipe = (struct rt_audio_pipe *)dev;
128 RT_ASSERT(pipe != RT_NULL);
129
130 if ((pipe->flag & RT_PIPE_FLAG_FORCE_WR) ||
131 !(pipe->flag & RT_PIPE_FLAG_BLOCK_WR))
132 {
133 level = rt_hw_interrupt_disable();
134
135 if (pipe->flag & RT_PIPE_FLAG_FORCE_WR)
136 write_nbytes = rt_ringbuffer_put_force(&(pipe->ringbuffer),
137 buffer, size);
138 else
139 write_nbytes = rt_ringbuffer_put(&(pipe->ringbuffer),
140 buffer, size);
141
142 _rt_pipe_resume_reader(pipe);
143
144 rt_hw_interrupt_enable(level);
145
146 return write_nbytes;
147 }
148
149 thread = rt_thread_self();
150
151 /* current context checking */
152 RT_DEBUG_NOT_IN_INTERRUPT;
153
154 do {
155 level = rt_hw_interrupt_disable();
156 write_nbytes = rt_ringbuffer_put(&(pipe->ringbuffer), buffer, size);
157 if (write_nbytes == 0)
158 {
159 /* pipe full, waiting on suspended write list */
160 rt_thread_suspend(thread);
161 /* waiting on suspended read list */
162 rt_list_insert_before(&(pipe->suspended_write_list),
163 &(thread->tlist));
164 rt_hw_interrupt_enable(level);
165
166 rt_schedule();
167 }
168 else
169 {
170 _rt_pipe_resume_reader(pipe);
171 rt_hw_interrupt_enable(level);
172 break;
173 }
174 } while (write_nbytes == 0);
175
176 return write_nbytes;
177 }
178
rt_pipe_control(rt_device_t dev,int cmd,void * args)179 static rt_err_t rt_pipe_control(rt_device_t dev, int cmd, void *args)
180 {
181 struct rt_audio_pipe *pipe;
182
183 pipe = (struct rt_audio_pipe *)dev;
184
185 if (cmd == PIPE_CTRL_GET_SPACE && args)
186 *(rt_size_t*)args = rt_ringbuffer_space_len(&pipe->ringbuffer);
187 return RT_EOK;
188 }
189
190 #ifdef RT_USING_DEVICE_OPS
191 const static struct rt_device_ops audio_pipe_ops =
192 {
193 RT_NULL,
194 RT_NULL,
195 RT_NULL,
196 rt_pipe_read,
197 rt_pipe_write,
198 rt_pipe_control
199 };
200 #endif
201
202 /**
203 * This function will initialize a pipe device and put it under control of
204 * resource management.
205 *
206 * @param pipe the pipe device
207 * @param name the name of pipe device
208 * @param flag the attribute of the pipe device
209 * @param buf the buffer of pipe device
210 * @param size the size of pipe device buffer
211 *
212 * @return the operation status, RT_EOK on successful
213 */
rt_audio_pipe_init(struct rt_audio_pipe * pipe,const char * name,rt_int32_t flag,rt_uint8_t * buf,rt_size_t size)214 rt_err_t rt_audio_pipe_init(struct rt_audio_pipe *pipe,
215 const char *name,
216 rt_int32_t flag,
217 rt_uint8_t *buf,
218 rt_size_t size)
219 {
220 RT_ASSERT(pipe);
221 RT_ASSERT(buf);
222
223 /* initialize suspended list */
224 rt_list_init(&pipe->suspended_read_list);
225 rt_list_init(&pipe->suspended_write_list);
226
227 /* initialize ring buffer */
228 rt_ringbuffer_init(&pipe->ringbuffer, buf, size);
229
230 pipe->flag = flag;
231
232 /* create pipe */
233 pipe->parent.type = RT_Device_Class_Pipe;
234 #ifdef RT_USING_DEVICE_OPS
235 pipe->parent.ops = &audio_pipe_ops;
236 #else
237 pipe->parent.init = RT_NULL;
238 pipe->parent.open = RT_NULL;
239 pipe->parent.close = RT_NULL;
240 pipe->parent.read = rt_pipe_read;
241 pipe->parent.write = rt_pipe_write;
242 pipe->parent.control = rt_pipe_control;
243 #endif
244
245 return rt_device_register(&(pipe->parent), name, RT_DEVICE_FLAG_RDWR);
246 }
247
248 /**
249 * This function will detach a pipe device from resource management
250 *
251 * @param pipe the pipe device
252 *
253 * @return the operation status, RT_EOK on successful
254 */
rt_audio_pipe_detach(struct rt_audio_pipe * pipe)255 rt_err_t rt_audio_pipe_detach(struct rt_audio_pipe *pipe)
256 {
257 return rt_device_unregister(&pipe->parent);
258 }
259
260 #ifdef RT_USING_HEAP
rt_audio_pipe_create(const char * name,rt_int32_t flag,rt_size_t size)261 rt_err_t rt_audio_pipe_create(const char *name, rt_int32_t flag, rt_size_t size)
262 {
263 rt_uint8_t *rb_memptr = RT_NULL;
264 struct rt_audio_pipe *pipe = RT_NULL;
265
266 /* get aligned size */
267 size = RT_ALIGN(size, RT_ALIGN_SIZE);
268 pipe = (struct rt_audio_pipe *)rt_calloc(1, sizeof(struct rt_audio_pipe));
269 if (pipe == RT_NULL)
270 return -RT_ENOMEM;
271
272 /* create ring buffer of pipe */
273 rb_memptr = rt_malloc(size);
274 if (rb_memptr == RT_NULL)
275 {
276 rt_free(pipe);
277 return -RT_ENOMEM;
278 }
279
280 return rt_audio_pipe_init(pipe, name, flag, rb_memptr, size);
281 }
282
rt_audio_pipe_destroy(struct rt_audio_pipe * pipe)283 void rt_audio_pipe_destroy(struct rt_audio_pipe *pipe)
284 {
285 if (pipe == RT_NULL)
286 return;
287
288 /* un-register pipe device */
289 rt_audio_pipe_detach(pipe);
290
291 /* release memory */
292 rt_free(pipe->ringbuffer.buffer_ptr);
293 rt_free(pipe);
294
295 return;
296 }
297
298 #endif /* RT_USING_HEAP */
299