Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ set(C_WARNING_GNU -Wall -Wextra -Wpedantic -pedantic-errors
-Wno-empty-body -Wno-unused-parameter -Wno-missing-field-initializers -Wno-sign-compare -Wno-type-limits)
set(C_WARNING_Clang -Wall -Wextra -Wpedantic
-Wstrict-prototypes -Wold-style-definition
-Wno-unused-parameter -Wno-missing-field-initializers -Wno-sign-compare -Wno-language-extension-token
-Wthread-safety -Wno-unused-parameter -Wno-missing-field-initializers -Wno-sign-compare -Wno-language-extension-token
-Wno-gnu-statement-expression-from-macro-expansion)

# Set default build type. Must use FORCE because project() sets default to ""
Expand Down Expand Up @@ -140,7 +140,7 @@ if(NOT DEFINED VERSION)
OUTPUT_VARIABLE BRANCH_NAME
OUTPUT_STRIP_TRAILING_WHITESPACE)
message(STATUS "BRANCH_NAME is ${BRANCH_NAME}")

if (BRANCH_NAME STREQUAL "HEAD")
set(QPID_DISPATCH_VERSION "0.0.0")
else(BRANCH_NAME STREQUAL "HEAD")
Expand Down Expand Up @@ -172,7 +172,7 @@ if(NOT DEFINED VERSION)
# Git executable was not available, we will not be able to determine the version, just set it to "0.0.0"
set(QPID_DISPATCH_VERSION "0.0.0")
endif(Git_FOUND)

else(NOT DEFINED VERSION)
message(STATUS "VERSION is already provided")
# What if VERSION is defined but someone passed in an empty value for VERSION? Deal with that case here.
Expand All @@ -185,7 +185,7 @@ else(NOT DEFINED VERSION)
else()
set(QPID_DISPATCH_VERSION ${VERSION})
endif()
endif(VERSION STREQUAL "")
endif(VERSION STREQUAL "")
endif(NOT DEFINED VERSION)

message(STATUS "Setting skupper-router version to ${QPID_DISPATCH_VERSION}")
Expand Down
86 changes: 86 additions & 0 deletions include/qpid/dispatch/internal/thread_annotations.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

// Copyright 2017 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#pragma once

// This header defines thread annotation macros to be used everywhere in Zircon
// outside of publicly exposed headers. See system/public/zircon/compiler.h for
// the publicly exported macros.

// The thread safety analysis system is documented at
// http://clang.llvm.org/docs/ThreadSafetyAnalysis.html and its use in Zircon is documented at
// docs/thread_annotations.md. The macros we use are:
//
// TA_CAP(x) |x| is the capability this type represents, e.g. "mutex".
// TA_GUARDED(x) the annotated variable is guarded by the capability (e.g. lock) |x|
// TA_ACQ(x) function acquires the mutex |x|
// TA_ACQ_SHARED(x) function acquires the mutex |x| for shared reading
// TA_ACQ_BEFORE(x) Indicates that if both this mutex and muxex |x| are to be acquired,
// that this mutex must be acquired before mutex |x|.
// TA_ACQ_AFTER(x) Indicates that if both this mutex and muxex |x| are to be acquired,
// that this mutex must be acquired after mutex |x|.
// TA_TRY_ACQ(bool, x) function acquires the mutex |x| if the function returns |bool|
// TA_TRY_ACQ_SHARED(bool, x) function acquires the mutex |x| for shared reading if the function
// returns |bool|
// TA_REL(x) function releases the mutex |x|
// TA_REL_SHARED(x) function releases the shared for reading mutex |x|
// TA_ASSERT(x) function asserts that |x| is held
// TA_ASSERT_SHARED(x) function asserts that |x| is held for shared reading
// TA_REQ(x) function requires that the caller hold the mutex |x|
// TA_REQ_SHARED(x) function requires that the caller hold the mutex |x| for shared
// reading TA_EXCL(x) function requires that the caller not be holding the mutex
// |x| TA_RET_CAP(x) function returns a reference to the mutex |x| TA_SCOPED_CAP type
// represents a scoped or RAII-style wrapper around a capability TA_NO_THREAD_SAFETY_ANALYSIS
// function is excluded entirely from thread safety analysis

#ifdef __clang__
#define TA_SUPPRESS _Pragma("clang diagnostic ignored \"-Wthread-safety-analysis\"")
#else
#define TA_SUPPRESS
#endif

#ifdef __clang__
#define THREAD_ANNOTATION(x) __attribute__((x))
#else
#define THREAD_ANNOTATION(x)
#endif

#define TA_CAP(x) THREAD_ANNOTATION(capability(x))
#define TA_GUARDED(x) THREAD_ANNOTATION(guarded_by(x))
#define TA_ACQ(...) THREAD_ANNOTATION(acquire_capability(__VA_ARGS__))
#define TA_ACQ_SHARED(...) THREAD_ANNOTATION(acquire_shared_capability(__VA_ARGS__))
#define TA_ACQ_BEFORE(...) THREAD_ANNOTATION(acquired_before(__VA_ARGS__))
#define TA_ACQ_AFTER(...) THREAD_ANNOTATION(acquired_after(__VA_ARGS__))
#define TA_TRY_ACQ(...) THREAD_ANNOTATION(try_acquire_capability(__VA_ARGS__))
#define TA_TRY_ACQ_SHARED(...) THREAD_ANNOTATION(try_acquire_shared_capability(__VA_ARGS__))
#define TA_REL(...) THREAD_ANNOTATION(release_capability(__VA_ARGS__))
#define TA_REL_SHARED(...) THREAD_ANNOTATION(release_shared_capability(__VA_ARGS__))
#define TA_REL_GENERIC(...) THREAD_ANNOTATION(release_generic_capability(__VA_ARGS__))
#define TA_ASSERT(...) THREAD_ANNOTATION(assert_capability(__VA_ARGS__))
#define TA_ASSERT_SHARED(...) THREAD_ANNOTATION(assert_shared_capability(__VA_ARGS__))
#define TA_REQ(...) THREAD_ANNOTATION(requires_capability(__VA_ARGS__))
#define TA_REQ_SHARED(...) THREAD_ANNOTATION(requires_shared_capability(__VA_ARGS__))
#define TA_EXCL(...) THREAD_ANNOTATION(locks_excluded(__VA_ARGS__))
#define TA_RET_CAP(x) THREAD_ANNOTATION(lock_returned(x))
#define TA_SCOPED_CAP THREAD_ANNOTATION(scoped_lockable)
#define TA_NO_THREAD_SAFETY_ANALYSIS THREAD_ANNOTATION(no_thread_safety_analysis)
26 changes: 14 additions & 12 deletions include/qpid/dispatch/threading.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,38 +26,40 @@
#include <assert.h>
#include <pthread.h>

typedef struct sys_mutex_t sys_mutex_t;
#include "qpid/dispatch/internal/thread_annotations.h"

typedef struct sys_mutex_t TA_CAP("mutex") sys_mutex_t;
struct sys_mutex_t {
pthread_mutex_t mutex;
};

void sys_mutex_init(sys_mutex_t *mutex);
void sys_mutex_free(sys_mutex_t *mutex);
void sys_mutex_lock(sys_mutex_t *mutex);
void sys_mutex_unlock(sys_mutex_t *mutex);
void sys_mutex_lock(sys_mutex_t *mutex) TA_ACQ(*mutex);
void sys_mutex_unlock(sys_mutex_t *mutex) TA_REL(*mutex);

typedef struct sys_cond_t sys_cond_t;
typedef struct sys_cond_t TA_CAP("cond") sys_cond_t;
struct sys_cond_t {
pthread_cond_t cond;
};

void sys_cond_init(sys_cond_t *cond);
void sys_cond_free(sys_cond_t *cond);
void sys_cond_wait(sys_cond_t *cond, sys_mutex_t *held_mutex);
void sys_cond_wait(sys_cond_t *cond, sys_mutex_t *held_mutex) TA_REQ(*held_mutex);
void sys_cond_signal(sys_cond_t *cond);
void sys_cond_signal_all(sys_cond_t *cond);


typedef struct sys_rwlock_t sys_rwlock_t;
typedef struct sys_rwlock_t TA_CAP("rwlock") sys_rwlock_t;
struct sys_rwlock_t {
pthread_rwlock_t lock;
};

void sys_rwlock_init(sys_rwlock_t *lock);
void sys_rwlock_free(sys_rwlock_t *lock);
void sys_rwlock_wrlock(sys_rwlock_t *lock);
void sys_rwlock_rdlock(sys_rwlock_t *lock);
void sys_rwlock_unlock(sys_rwlock_t *lock);
void sys_rwlock_wrlock(sys_rwlock_t *lock) TA_ACQ(*lock);
void sys_rwlock_rdlock(sys_rwlock_t *lock) TA_ACQ_SHARED(*lock);
void sys_rwlock_unlock(sys_rwlock_t *lock) TA_REL_GENERIC(*lock);

typedef enum {
SYS_THREAD_MAIN,
Expand Down Expand Up @@ -108,16 +110,16 @@ typedef enum {
SYS_THREAD_PROACTOR_MODE_TIMER = SYS_THREAD_PROACTOR_MODE_OTHER,
} sys_thread_proactor_mode_t;

typedef struct sys_spinlock_t sys_spinlock_t;
typedef struct sys_spinlock_t TA_CAP("spinlock") sys_spinlock_t;
struct sys_spinlock_t {
pthread_mutexattr_t attr;
pthread_mutex_t lock;
};

void sys_spinlock_init(sys_spinlock_t *lock);
void sys_spinlock_free(sys_spinlock_t *lock);
void sys_spinlock_lock(sys_spinlock_t *lock);
void sys_spinlock_unlock(sys_spinlock_t *lock);
void sys_spinlock_lock(sys_spinlock_t *lock) TA_ACQ(*lock);
void sys_spinlock_unlock(sys_spinlock_t *lock) TA_REL(*lock);


typedef struct sys_thread_t sys_thread_t;
Expand Down
4 changes: 2 additions & 2 deletions src/dispatch.c
Original file line number Diff line number Diff line change
Expand Up @@ -398,8 +398,8 @@ void qd_dispatch_free(qd_dispatch_t *qd)
}


QD_EXPORT void qd_dispatch_router_lock(qd_dispatch_t *qd) { sys_mutex_lock(&qd->router->lock); }
QD_EXPORT void qd_dispatch_router_unlock(qd_dispatch_t *qd) { sys_mutex_unlock(&qd->router->lock); }
QD_EXPORT void qd_dispatch_router_lock(qd_dispatch_t *qd) TA_ACQ(qd->router->lock) { sys_mutex_lock(&qd->router->lock); }
QD_EXPORT void qd_dispatch_router_unlock(qd_dispatch_t *qd) TA_REL(qd->router->lock) { sys_mutex_unlock(&qd->router->lock); }

qdr_core_t* qd_dispatch_router_core(const qd_dispatch_t *qd)
{
Expand Down
5 changes: 3 additions & 2 deletions src/entity_cache.c
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ void qd_entity_cache_remove(const char *type, void *object) { push_event(REMOVE,
// Locks the entity cache so entities can be updated safely (prevent entities from being deleted.)
// Do not process any entities if return error code != 0
// Must call qd_entity_refresh_end when done, regardless of error code.
QD_EXPORT qd_error_t qd_entity_refresh_begin(PyObject *list) {
QD_EXPORT qd_error_t qd_entity_refresh_begin(PyObject *list) TA_ACQ(event_lock)
{
qd_error_clear();
sys_mutex_lock(&event_lock);
entity_event_t *event = DEQ_HEAD(event_list);
Expand All @@ -104,7 +105,7 @@ QD_EXPORT qd_error_t qd_entity_refresh_begin(PyObject *list) {
return qd_error_code();
}

QD_EXPORT void qd_entity_refresh_end(void)
QD_EXPORT void qd_entity_refresh_end(void) TA_REL(event_lock)
{
sys_mutex_unlock(&event_lock);
}
30 changes: 17 additions & 13 deletions src/log.c
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@

const char *QD_LOG_STATS_TYPE = "logStats";

static qd_log_source_t *default_log_source=0;
static qd_log_source_t *default_log_source = 0;

int qd_log_max_len(void)
{
Expand Down Expand Up @@ -156,7 +156,7 @@ qd_log_module_t get_log_module_from_module_name(char *module_name)
return -1;
}

static void qd_log_entry_free_lh(qd_log_entry_t *entry)
static void qd_log_entry_free_lh(qd_log_entry_t *entry) TA_REQ(log_source_lock)
{
DEQ_REMOVE(entries, entry);
free(entry->file);
Expand All @@ -178,14 +178,16 @@ void qd_log_formatted_time(const struct timeval *time, char *buf, size_t buflen)
snprintf(buf, buflen, fmt, time->tv_usec);
}

static log_sink_t* find_log_sink_lh(const char* name) {
static log_sink_t *find_log_sink_lh(const char *name) TA_REQ(log_source_lock)
{
log_sink_t* sink = DEQ_HEAD(sink_list);
DEQ_FIND(sink, strcmp(sink->name, name) == 0);
return sink;
}

// Must hold the log_source_lock
static void log_sink_free_lh(log_sink_t* sink) {
static void log_sink_free_lh(log_sink_t *sink) TA_REQ(log_source_lock)
{
if (!sink) return;
assert(sink->ref_count);

Expand All @@ -200,7 +202,8 @@ static void log_sink_free_lh(log_sink_t* sink) {
}
}

static log_sink_t* log_sink_lh(const char* name) {
static log_sink_t *log_sink_lh(const char *name) TA_REQ(log_source_lock)
{
log_sink_t* sink = find_log_sink_lh(name);
if (sink) {
sys_atomic_inc(&sink->ref_count);
Expand Down Expand Up @@ -312,7 +315,7 @@ static bool default_bool(int value, int default_value) {

// Format and output the log message to the log_source. Expects the log_source_lock is held.
//
static void write_log_lh(qd_log_source_t *log_source, qd_log_entry_t *entry)
static void write_log_lh(qd_log_source_t *log_source, qd_log_entry_t *entry) TA_REQ(log_source_lock)
{
log_sink_t* sink = log_source->sink ? log_source->sink : default_log_source->sink;

Expand Down Expand Up @@ -394,7 +397,8 @@ static void write_log_lh(qd_log_source_t *log_source, qd_log_entry_t *entry)
}

/// Reset the log source to the default state
static void qd_log_source_defaults(qd_log_source_t *log_source) {
static void qd_log_source_defaults(qd_log_source_t *log_source) TA_REQ(log_source_lock)
{
log_source->mask = -1;
log_source->includeTimestamp = -1;
log_source->includeSource = -1;
Expand All @@ -403,7 +407,7 @@ static void qd_log_source_defaults(qd_log_source_t *log_source) {
}

/// Caller must hold the log_source_lock
static qd_log_source_t *qd_log_source_lh(qd_log_module_t module)
static qd_log_source_t *qd_log_source_lh(qd_log_module_t module) TA_REQ(log_source_lock)
{
qd_log_source_t *log_source = log_sources[module];

Expand Down Expand Up @@ -438,7 +442,7 @@ qd_log_source_t *qd_log_source_reset(qd_log_module_t module)
return src;
}

static void qd_log_source_free_lh(qd_log_module_t module)
static void qd_log_source_free_lh(qd_log_module_t module) TA_REQ(log_source_lock)
{
qd_log_source_t *src = log_sources[module];
if (src) {
Expand All @@ -457,7 +461,7 @@ bool qd_log_enabled(qd_log_module_t module, qd_log_level_t level)
return level & mask;
}

bool log_enabled_lh(qd_log_source_t *source, qd_log_level_t level)
bool log_enabled_lh(qd_log_source_t *source, qd_log_level_t level) TA_REQ(log_source_lock)
{
if (!source)
return false;
Expand Down Expand Up @@ -570,7 +574,7 @@ QD_EXPORT PyObject *qd_log_recent_py(long limit) {
return NULL;
}

void qd_log_initialize(void)
void qd_log_initialize(void) TA_NO_THREAD_SAFETY_ANALYSIS
{
DEQ_INIT(entries);
DEQ_INIT(sink_list);
Expand All @@ -590,8 +594,8 @@ void qd_log_initialize(void)
default_log_source->sink = log_sink_lh(SINK_STDERR);
}


void qd_log_finalize(void) {
void qd_log_finalize(void) TA_NO_THREAD_SAFETY_ANALYSIS
{
for (int i = 0; i < NUM_LOG_SOURCES; i++)
qd_log_source_free_lh(i);
while (DEQ_HEAD(entries))
Expand Down
Loading