1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
27
28
32
33
37
38
47
48
53
54
61
62
63
64
83
84
96
97
98
99
100
101
102
103
108
109
110
111
112
113
114
115
116
117
126
127
128
129
130
131
132
133
134
135
136
137
138
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
161
162
163
172
173
174
175
176
177
178
179
180
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
203
204
205
206
207
208
209
210
211
212
213
216
217
218
221
222
223
224
227
228
229
236
237
238
241
242
243
246
247
248
249
252
253
256
257
258
259
260
261
262
263
264
265
268
269
272
273
279
280
284
285
288
289
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
312
313
317
318
321
322
325
326
327
328
329
330
331
332
333
334
335
338
339
340
341
342
343
344
347
348
349
350
351
352
353
360
361
367
368
376
377
378
392
393
401
402
403
404
407
408
411
412
413
414
415
416
417
418
428
429
430
431
432
433
434
435
436
437
438
439
440
441
450
451
452
453
/* ... */
#include <string.h>
#include "osi/allocator.h"
#include "freertos/FreeRTOS.h"
#include "freertos/queue.h"
#include "osi/semaphore.h"
#include "osi/thread.h"
#include "osi/mutex.h"7 includes
struct work_item {
osi_thread_func_t func;
void *context;
}{ ... };
struct work_queue {
QueueHandle_t queue;
size_t capacity;
}{ ... };
struct osi_thread {
TaskHandle_t thread_handle;
int thread_id;
bool stop;
uint8_t work_queue_num;
struct work_queue **work_queues;
osi_sem_t work_sem;
osi_sem_t stop_sem;
}{ ... };
struct osi_thread_start_arg {
osi_thread_t *thread;
osi_sem_t start_sem;
int error;
}{ ... };
struct osi_event {
struct work_item item;
osi_mutex_t lock;
uint16_t is_queued;
uint16_t queue_idx;
osi_thread_t *thread;
}{ ... };
static const size_t DEFAULT_WORK_QUEUE_CAPACITY = 100;
static struct work_queue *osi_work_queue_create(size_t capacity)
{
if (capacity == 0) {
return NULL;
}{...}
struct work_queue *wq = (struct work_queue *)osi_malloc(sizeof(struct work_queue));
if (wq != NULL) {
wq->queue = xQueueCreate(capacity, sizeof(struct work_item));
if (wq->queue != 0) {
wq->capacity = capacity;
return wq;
}{...} else {
osi_free(wq);
}{...}
}{...}
return NULL;
}{ ... }
static void osi_work_queue_delete(struct work_queue *wq)
{
if (wq != NULL) {
if (wq->queue != 0) {
vQueueDelete(wq->queue);
}{...}
wq->queue = 0;
wq->capacity = 0;
osi_free(wq);
}{...}
return;
}{ ... }
static bool osi_thead_work_queue_get(struct work_queue *wq, struct work_item *item)
{
assert (wq != NULL);
assert (wq->queue != 0);
assert (item != NULL);
if (pdTRUE == xQueueReceive(wq->queue, item, 0)) {
return true;
}{...} else {
return false;
}{...}
}{ ... }
static bool osi_thead_work_queue_put(struct work_queue *wq, const struct work_item *item, uint32_t timeout)
{
assert (wq != NULL);
assert (wq->queue != 0);
assert (item != NULL);
bool ret = true;
if (timeout == OSI_SEM_MAX_TIMEOUT) {
if (xQueueSend(wq->queue, item, portMAX_DELAY) != pdTRUE) {
ret = false;
}{...}
}{...} else {
if (xQueueSend(wq->queue, item, timeout / portTICK_PERIOD_MS) != pdTRUE) {
ret = false;
}{...}
}{...}
return ret;
}{ ... }
static size_t osi_thead_work_queue_len(struct work_queue *wq)
{
assert (wq != NULL);
assert (wq->queue != 0);
assert (wq->capacity != 0);
size_t available_spaces = (size_t)uxQueueSpacesAvailable(wq->queue);
if (available_spaces <= wq->capacity) {
return wq->capacity - available_spaces;
}{...} else {
assert (0);
}{...}
return 0;
}{ ... }
static void osi_thread_run(void *arg)
{
struct osi_thread_start_arg *start = (struct osi_thread_start_arg *)arg;
osi_thread_t *thread = start->thread;
osi_sem_give(&start->start_sem);
while (1) {
int idx = 0;
osi_sem_take(&thread->work_sem, OSI_SEM_MAX_TIMEOUT);
if (thread->stop) {
break;
}{...}
struct work_item item;
while (!thread->stop && idx < thread->work_queue_num) {
if (osi_thead_work_queue_get(thread->work_queues[idx], &item) == true) {
item.func(item.context);
idx = 0;
continue;
}{...} else {
idx++;
}{...}
}{...}
}{...}
thread->thread_handle = NULL;
osi_sem_give(&thread->stop_sem);
vTaskDelete(NULL);
}{ ... }
static int osi_thread_join(osi_thread_t *thread, uint32_t wait_ms)
{
assert(thread != NULL);
return osi_sem_take(&thread->stop_sem, wait_ms);
}{ ... }
static void osi_thread_stop(osi_thread_t *thread)
{
int ret;
assert(thread != NULL);
thread->stop = true;
osi_sem_give(&thread->work_sem);
ret = osi_thread_join(thread, 1000);
if (ret != 0 && thread->thread_handle) {
vTaskDelete(thread->thread_handle);
}{...}
}{ ... }
osi_thread_t *osi_thread_create(const char *name, size_t stack_size, int priority, osi_thread_core_t core, uint8_t work_queue_num, const size_t work_queue_len[])
{
int ret;
struct osi_thread_start_arg start_arg = {0};
if (stack_size <= 0 ||
core < OSI_THREAD_CORE_0 || core > OSI_THREAD_CORE_AFFINITY ||
work_queue_num <= 0 || work_queue_len == NULL) {
return NULL;
}{...}
osi_thread_t *thread = (osi_thread_t *)osi_calloc(sizeof(osi_thread_t));
if (thread == NULL) {
goto _err;
}{...}
thread->stop = false;
thread->work_queues = (struct work_queue **)osi_calloc(sizeof(struct work_queue *) * work_queue_num);
if (thread->work_queues == NULL) {
goto _err;
}{...}
thread->work_queue_num = work_queue_num;
for (int i = 0; i < thread->work_queue_num; i++) {
size_t queue_len = work_queue_len[i] ? work_queue_len[i] : DEFAULT_WORK_QUEUE_CAPACITY;
thread->work_queues[i] = osi_work_queue_create(queue_len);
if (thread->work_queues[i] == NULL) {
goto _err;
}{...}
}{...}
ret = osi_sem_new(&thread->work_sem, 1, 0);
if (ret != 0) {
goto _err;
}{...}
ret = osi_sem_new(&thread->stop_sem, 1, 0);
if (ret != 0) {
goto _err;
}{...}
start_arg.thread = thread;
ret = osi_sem_new(&start_arg.start_sem, 1, 0);
if (ret != 0) {
goto _err;
}{...}
if (xTaskCreatePinnedToCore(osi_thread_run, name, stack_size, &start_arg, priority, &thread->thread_handle, core) != pdPASS) {
goto _err;
}{...}
osi_sem_take(&start_arg.start_sem, OSI_SEM_MAX_TIMEOUT);
osi_sem_free(&start_arg.start_sem);
return thread;
_err:
if (thread) {
if (start_arg.start_sem) {
osi_sem_free(&start_arg.start_sem);
}{...}
if (thread->thread_handle) {
vTaskDelete(thread->thread_handle);
}{...}
for (int i = 0; i < thread->work_queue_num; i++) {
if (thread->work_queues && thread->work_queues[i]) {
osi_work_queue_delete(thread->work_queues[i]);
thread->work_queues[i] = NULL;
}{...}
}{...}
if (thread->work_queues) {
osi_free(thread->work_queues);
thread->work_queues = NULL;
}{...}
if (thread->work_sem) {
osi_sem_free(&thread->work_sem);
}{...}
if (thread->stop_sem) {
osi_sem_free(&thread->stop_sem);
}{...}
osi_free(thread);
}{...}
return NULL;
}{ ... }
void osi_thread_free(osi_thread_t *thread)
{
if (!thread)
return;
osi_thread_stop(thread);
for (int i = 0; i < thread->work_queue_num; i++) {
if (thread->work_queues[i]) {
osi_work_queue_delete(thread->work_queues[i]);
thread->work_queues[i] = NULL;
}{...}
}{...}
if (thread->work_queues) {
osi_free(thread->work_queues);
thread->work_queues = NULL;
}{...}
if (thread->work_sem) {
osi_sem_free(&thread->work_sem);
}{...}
if (thread->stop_sem) {
osi_sem_free(&thread->stop_sem);
}{...}
osi_free(thread);
}{ ... }
bool osi_thread_post(osi_thread_t *thread, osi_thread_func_t func, void *context, int queue_idx, uint32_t timeout)
{
assert(thread != NULL);
assert(func != NULL);
if (queue_idx >= thread->work_queue_num) {
return false;
}{...}
struct work_item item;
item.func = func;
item.context = context;
if (osi_thead_work_queue_put(thread->work_queues[queue_idx], &item, timeout) == false) {
return false;
}{...}
osi_sem_give(&thread->work_sem);
return true;
}{ ... }
bool osi_thread_set_priority(osi_thread_t *thread, int priority)
{
assert(thread != NULL);
vTaskPrioritySet(thread->thread_handle, priority);
return true;
}{ ... }
const char *osi_thread_name(osi_thread_t *thread)
{
assert(thread != NULL);
return pcTaskGetName(thread->thread_handle);
}{ ... }
int osi_thread_queue_wait_size(osi_thread_t *thread, int wq_idx)
{
if (wq_idx < 0 || wq_idx >= thread->work_queue_num) {
return -1;
}{...}
return (int)(osi_thead_work_queue_len(thread->work_queues[wq_idx]));
}{ ... }
struct osi_event *osi_event_create(osi_thread_func_t func, void *context)
{
struct osi_event *event = osi_calloc(sizeof(struct osi_event));
if (event != NULL) {
if (osi_mutex_new(&event->lock) == 0) {
event->item.func = func;
event->item.context = context;
return event;
}{...}
osi_free(event);
}{...}
return NULL;
}{ ... }
void osi_event_delete(struct osi_event* event)
{
if (event != NULL) {
osi_mutex_free(&event->lock);
memset(event, 0, sizeof(struct osi_event));
osi_free(event);
}{...}
}{ ... }
bool osi_event_bind(struct osi_event* event, osi_thread_t *thread, int queue_idx)
{
if (event == NULL || event->thread != NULL) {
return false;
}{...}
if (thread == NULL || queue_idx >= thread->work_queue_num) {
return false;
}{...}
event->thread = thread;
event->queue_idx = queue_idx;
return true;
}{ ... }
static void osi_thread_generic_event_handler(void *context)
{
struct osi_event *event = (struct osi_event *)context;
if (event != NULL && event->item.func != NULL) {
osi_mutex_lock(&event->lock, OSI_MUTEX_MAX_TIMEOUT);
event->is_queued = 0;
osi_mutex_unlock(&event->lock);
event->item.func(event->item.context);
}{...}
}{ ... }
bool osi_thread_post_event(struct osi_event *event, uint32_t timeout)
{
assert(event != NULL && event->thread != NULL);
assert(event->queue_idx >= 0 && event->queue_idx < event->thread->work_queue_num);
bool ret = false;
if (event->is_queued == 0) {
uint16_t acquire_cnt = 0;
osi_mutex_lock(&event->lock, OSI_MUTEX_MAX_TIMEOUT);
event->is_queued += 1;
acquire_cnt = event->is_queued;
osi_mutex_unlock(&event->lock);
if (acquire_cnt == 1) {
ret = osi_thread_post(event->thread, osi_thread_generic_event_handler, event, event->queue_idx, timeout);
if (!ret) {
osi_mutex_lock(&event->lock, OSI_MUTEX_MAX_TIMEOUT);
event->is_queued = 0;
osi_mutex_unlock(&event->lock);
}{...}
}{...}
}{...}
return ret;
}{ ... }