在C++中竣事无锁部队是一个复杂的任务,因为无锁编程触及到复杂的内存操作和同步机制,需要深远相识并发编程和硬件级别的原子操作。无锁部队的上风在于高并发场景下不错提供更好的性能,因为它幸免了传统锁机制带来的线程竞争和荆棘文切换支出。
C++无锁部队是一种多线程编程本事,它不错在不使用锁的情况下竣事线程安全的部队。它不错提高多线程门径的性能,无锁部队的主要想想是让多个线程同期看望部队,而不需要使用锁来保护分享资源。这不错幸免锁竞争和死锁等问题,从而提高门径的成果。竣事无锁部队平凡需要使用C++11或更高版块的原子操作库(如<atomic>),以及一些高等的并发截止本事。
一、无锁部队综合无锁部队(Lock-Free Queue)是一种并发数据结构,用于在多线程环境下竣事高效的数据交换。与传统的基于锁的部队比较,无锁部队使用了一些特殊的算法和本事,幸免了线程之间的互斥操作,从而提高了并发性能和反应性。
无锁部队平凡基于原子操作(atomic operations)或其他底层同步原语来竣事,何况它们罗致一些玄妙的设施来确保操作的正确性。主要想想是通过使用原子读写操作或雷同的机制,在莫得显式锁定通盘部队的情况下竣事线程安全。
典型的无锁部队算法有轮回缓冲区(Circular Buffer)和链表(Linked List)等。轮回缓冲区平凡使用两个指针(head 和 tail)来暴露部队的滥觞和收尾位置,愚弄自旋、CAS (Compare-and-Swap) 等原子操作来进行入队和出队操作。链表则通过愚弄 CAS 操作插入或删除节点来竣事并发看望。
优点:
提供了更好的并发性能,幸免了互斥操作带来的性能瓶颈。
对于高度竞争情况下不错提供更好的可伸缩性。
舛错:
竣事相对复杂,需要酌量并发安全和正确性问题。
在高度竞争的情况下可能出现自旋恭候导致的性能失掉。
二、无锁部队旨趣无锁部队的旨趣是通过使用原子操作(atomic operations)或其他底层同步原语来竣事并发安全。它们幸免了传统锁机制中的互斥操作,以提高并发性能和反应性。
典型的无锁部队算法有轮回缓冲区(Circular Buffer)和链表(Linked List)等。
在轮回缓冲区的竣事中,平凡使用两个指针来暴露部队的滥觞位置和收尾位置,即头指针(head)和尾指针(tail)。入队时,通过自旋、CAS (Compare-and-Swap) 等原子操作更新尾指针,并将元素放入相应位置。出队时,一样愚弄原子操作更新头指针,并复返对应位置上的元素。
链表竣事无锁部队时,在插入或删除节点时使用 CAS 操作来确保唯有一个线程到手修改节点的指针值。这样不错幸免对通盘链表进行加锁操作。
不管是轮回缓冲区如故链表竣事,要道点在于如何愚弄原子操作确保不同线程之间的联接与一致性。需要仔细处理并发情况下可能出现的竞争条款,并打算合适的算法来保证正确性和性能。
2.1部队操作模子部队是一种相配进军的数据结构,其脾性是先进先出(FIFO),适当活水线业务经由。在进度间通讯、集会通讯间通常罗致部队作念缓存,缓解数据处理压力。左证操作部队的场景分为:单分娩者——单消耗者、多分娩者——单消耗者、单分娩者——多消耗者、多分娩者——多消耗者四大模子。左证部队中数据分为:部队中的数据是定长的、部队中的数据是变长的。
(1)单分娩者——单消耗者
图片
(2)多分娩者——单消耗者
图片
(3)单分娩者——多消耗者
图片
(4)多分娩者——多消耗者
图片
2.2CAS操作CAS即Compare and Swap,是所有CPU提示都因循CAS的原子操作(X86中CMPXCHG汇编提示),用于竣事竣事各式无锁(lock free)数据结构。
CAS操作的C话语竣事如下:
bool compare_and_swap ( int *memory_location, int expected_value, int new_value){ if (*memory_location == expected_value) { *memory_location = new_value; return true; } return false;}
CAS用于搜检一个内存位置是否包含预期值,若是包含,则把新值复赋值到内存位置。到手复返true,失败复返false。
(1)GGC对CAS因循,GCC4.1+版块中因循CAS原子操作。
bool __sync_bool_compare_and_swap (type *ptr, type oldval type newval, ...);type __sync_val_compare_and_swap (type *ptr, type oldval type newval, ...);
(2)Windows对CAS因循,Windows中使用Windows API因循CAS。
LONG InterlockedCompareExchange( LONG volatile *Destination, LONG ExChange, LONG Comperand);
(3)C11对CAS因循,C11 STL中atomic函数因循CAS并不错跨平台。
template< class T >bool atomic_compare_exchange_weak( std::atomic* obj,T* expected, T desired );template< class T >bool atomic_compare_exchange_weak( volatile std::atomic* obj,T* expected, T desired );
其它原子操作如下:
Fetch-And-Add:一般用来对变量作念+1的原子操作
Test-and-set:写值到某个内存位置并传回其旧值
2.3部队数据定长与变长(1)部队数据定长
图片
(2)部队数据变长
图片
三、无锁部队决策3.1boost决策boost提供了三种无锁决策,辞别适用不同使用场景。
boost::lockfree::queue是因循多个分娩者和多个消耗者线程的无锁部队。
boost::lockfree::stack是因循多个分娩者和多个消耗者线程的无锁栈。
boost::lockfree::spsc_queue是仅因循单个分娩者和单个消耗者线程的无锁部队,比boost::lockfree::queue性能更好。
Boost无锁数据结构的API通过轻量级原子锁竣事lock-free,不是简直预料的无锁。
Boost提供的queue不错开垦开动容量,添加新元素时若是容量不够,则总容量自动增长;但对于无锁数据结构,添加新元素时若是容量不够,总容量不会自动增长。
3.2并发部队ConcurrentQueue罗致了无锁算法来竣事并发操作。它基于CAS(Compare-and-Swap)原子操作和其他底层同步原语来保证线程安全性。具体来说,它使用自旋锁和原子提示来确保对部队的修改是原子的,何况在多个线程之间分享数据时提供正确性保证。
ConcurrentQueue是基于C++竣事的工业级无锁部队决策。GitHub:https://github.com/cameron314/concurrentqueueReaderWriterQueue是基于C++竣事的单分娩者单消耗者场景的无锁部队决策。GitHub:https://github.com/cameron314/readerwriterqueue
ConcurrentQueue具有以下特色:
线程安全:多个线程不错同期对部队进行操作而无需额外加锁。
无窒碍:入队和出队操作平凡诟谇窒碍的,何况具有较低的支出。
先进先出(FIFO)法例:元素按照插入法例成列,在出队时会复返最早入队的元素。
使用ConcurrentQueue不错通俗地处理多个线程之间分享数据,并减少由于加锁引起的性能支出。但需要凝视,固然ConcurrentQueue提供了高效、线程安全的并发操作,但在某些特定情况下可能不适当所有应用场景,因此在选拔数据结构时需要左证具体需求进行评估。
3.3DisruptorDisruptor是一种高性能的并发编程框架,用于竣事无锁(lock-free)的并发数据结构。它领先由LMAX Exchange开发,并成为了其中枢交游引擎的要道组件。
Disruptor旨在惩处在高度多线程环境下的数据分享和通讯问题。它基于环形缓冲区(Ring Buffer)和事件驱动模子,通过优化内存看望和线程疏浚,提供了相配高效的音书传递机制。
Disruptor是英国际汇交游公司LMAX基于JAVA开发的一个高性能部队。GitHub:https://github.com/LMAX-Exchange/disruptor
主要特色如下:
无锁打算:Disruptor使用CAS(Compare-and-Swap)等无锁算法来幸免使用传统锁带来的竞争和窒碍。
高隐晦量:Disruptor愚弄环形缓冲区和预分拨内存等本事,在保证正确性前提下追求尽可能高的处理速率。
低延长:由于无锁打算和紧凑的内存布局,Disruptor大略竣事相配低的音书处理延长。
线程间联接:Disruptor提供了纯真而重大的事件发布、消耗者恭候及触发机制,可用于竣事复杂的线程间通讯风光。
使用Disruptor不错灵验地惩处分娩者-消耗者模子中数据传递过程中的性能瓶颈,尽头适用于高并发、低延长的应用场景,举例金融交游系统、音书部队等。可是,由于Disruptor对编程模子和相识要求较高,使用时需要仔细酌量,并左证具体需求评估是否适当。
四、无锁部队竣事4.1环形缓冲区RingBuffer是分娩者和消耗者模子中常用的数据结构,分娩者将数据追加到数组尾端,当达到数组的尾部时,分娩者绕回到数组的头部;消耗者从数组头端取走数据,当到达数组的尾部时,消耗者绕回到数组头部。
若是唯有一个分娩者和一个消耗者,环形缓冲区不错无锁看望,环形缓冲区的写入index只允许分娩者看望并修改,只消分娩者在更新index前将新的值保存到缓冲区中,则消耗者将恒久看到一致的数据结构;读取index也只允许消耗者看望并修改,消耗者只消在取走数据后更新读index,则分娩者将恒久看到一致的数据结构。
空部队时,front与rear异常;当有元素进队,则rear后移;有元素出队,则front后移。
空部队时,rear等于front;满部队时,部队尾部空一个位置,因此判断轮回部队满时使用(rear-front+maxn)%maxn。
入队操作:
data[rear] = x;rear = (rear+1)%maxn;
出队操作:
x = data[front];front = (front+1)%maxn;
单分娩者单消耗者
对于单分娩者和单消耗者场景,由于read_index和write_index都只会有一个线程写,因此不需要加锁也不需要原子操作,平直修改即可,但读写数据时需要酌量碰到数组尾部的情况。
线程对write_index和read_index的读写操作如下:
(1)写操作。先判断部队时否为满,若是部队未满,则先写数据,写完数据后再修改write_index。
(2)读操作。先判断部队是否为空,若是部队不为空,则先读数据,读完再修改read_index。
多分娩者单消耗者
多分娩者和单消耗者场景中,由于多个分娩者都会修改write_index,是以在不加锁的情况下必须使用原子操作。
4.2RingBuffer竣事RingBuffer.hpp文献:
#pragma once template <class T>class RingBuffer{public: RingBuffer(unsigned size): m_size(size), m_front(0), m_rear(0) { m_data = new T[size]; } ~RingBuffer() { delete [] m_data; m_data = NULL; } inline bool isEmpty() const { return m_front == m_rear; } inline bool isFull() const { return m_front == (m_rear + 1) % m_size; } bool push(const T& value) { if(isFull()) { return false; } m_data[m_rear] = value; m_rear = (m_rear + 1) % m_size; return true; } bool push(const T* value) { if(isFull()) { return false; } m_data[m_rear] = *value; m_rear = (m_rear + 1) % m_size; return true; } inline bool pop(T& value) { if(isEmpty()) { return false; } value = m_data[m_front]; m_front = (m_front + 1) % m_size; return true; } inline unsigned int front()const { return m_front; } inline unsigned int rear()const { return m_rear; } inline unsigned int size()const { return m_size; }private: unsigned int m_size;// 部队长度 int m_front;// 部队头部索引 int m_rear;// 部队尾部索引 T* m_data;// 数据缓冲区};
RingBufferTest.cpp测试代码:
#include <stdio.h>#include <thread>#include <unistd.h>#include <sys/time.h>#include "RingBuffer.hpp" class Test{public: Test(int id = 0, int value = 0) { this->id = id; this->value = value; sprintf(data, "id = %d, value = %d\n", this->id, this->value); } void display() { printf("%s", data); }private: int id; int value; char data[128];}; double getdetlatimeofday(struct timeval *begin, struct timeval *end){ return (end->tv_sec + end->tv_usec * 1.0 / 1000000) - (begin->tv_sec + begin->tv_usec * 1.0 / 1000000);} RingBuffer<Test> queue(1 << 12);2u000 #define N (10 * (1 << 20)) void produce(){ struct timeval begin, end; gettimeofday(&begin, NULL); unsigned int i = 0; while(i < N) { if(queue.push(Test(i % 1024, i))) { i++; } } gettimeofday(&end, NULL); double tm = getdetlatimeofday(&begin, &end); printf("producer tid=%lu %f MB/s %f msg/s elapsed= %f size= %u\n", pthread_self(), N * sizeof(Test) * 1.0 / (tm * 1024 * 1024), N * 1.0 / tm, tm, i);} void consume(){ sleep(1); Test test; struct timeval begin, end; gettimeofday(&begin, NULL); unsigned int i = 0; while(i < N) { if(queue.pop(test)) { // test.display(); i++; } } gettimeofday(&end, NULL); double tm = getdetlatimeofday(&begin, &end); printf("consumer tid=%lu %f MB/s %f msg/s elapsed= %f, size=%u \n", pthread_self(), N * sizeof(Test) * 1.0 / (tm * 1024 * 1024), N * 1.0 / tm, tm, i);} int main(int argc, char const *argv[]){ std::thread producer1(produce); std::thread consumer(consume); producer1.join(); consumer.join(); return 0;}
编译:
g++ --std=c++11 RingBufferTest.cpp -o test -pthread
单分娩者单消耗者场景下,音书隐晦量为350万条/秒左右。
4.3LockFreeQueue竣事LockFreeQueue.hpp:
#include <stdio.h>#include <stdlib.h>#include <string.h>#include <unistd.h>#include <fcntl.h>#include <stdbool.h>#include <sys/stat.h>#include <sys/types.h>#include <sys/time.h>#include <sys/mman.h> #define SHM_NAME_LEN 128#define MIN(a, b) ((a) > (b) ? (b) : (a))#define IS_POT(x) ((x) && !((x) & ((x)-1)))#define MEMORY_BARRIER __sync_synchronize() template <class T>class LockFreeQueue{protected: typedef struct { int m_lock; inline void spinlock_init() { m_lock = 0; } inline void spinlock_lock() { while(!__sync_bool_compare_and_swap(&m_lock, 0, 1)) {} } inline void spinlock_unlock() { __sync_lock_release(&m_lock); } } spinlock_t; public: // size:部队大小 // name:分享内存key的旅途称号,默许为NULL,使用数组动作底层缓冲区。 LockFreeQueue(unsigned int size, const char* name = NULL) { memset(shm_name, 0, sizeof(shm_name)); createQueue(name, size); } ~LockFreeQueue() { if(shm_name[0] == 0) { delete [] m_buffer; m_buffer = NULL; } else { if (munmap(m_buffer, m_size * sizeof(T)) == -1) { perror("munmap"); } if (shm_unlink(shm_name) == -1) { perror("shm_unlink"); } } } bool isFull()const {#ifdef USE_POT return m_head == (m_tail + 1) & (m_size - 1);#else return m_head == (m_tail + 1) % m_size;#endif } bool isEmpty()const { return m_head == m_tail; } unsigned int front()const { return m_head; } unsigned int tail()const { return m_tail; } bool push(const T& value) {#ifdef USE_LOCK m_spinLock.spinlock_lock();#endif if(isFull()) {#ifdef USE_LOCK m_spinLock.spinlock_unlock();#endif return false; } memcpy(m_buffer + m_tail, &value, sizeof(T));#ifdef USE_MB MEMORY_BARRIER;#endif #ifdef USE_POT m_tail = (m_tail + 1) & (m_size - 1);#else m_tail = (m_tail + 1) % m_size;#endif #ifdef USE_LOCK m_spinLock.spinlock_unlock();#endif return true; } bool pop(T& value) {#ifdef USE_LOCK m_spinLock.spinlock_lock();#endif if (isEmpty()) {#ifdef USE_LOCK m_spinLock.spinlock_unlock();#endif return false; } memcpy(&value, m_buffer + m_head, sizeof(T));#ifdef USE_MB MEMORY_BARRIER;#endif #ifdef USE_POT m_head = (m_head + 1) & (m_size - 1);#else m_head = (m_head + 1) % m_size;#endif #ifdef USE_LOCK m_spinLock.spinlock_unlock();#endif return true; } protected: virtual void createQueue(const char* name, unsigned int size) {#ifdef USE_POT if (!IS_POT(size)) { size = roundup_pow_of_two(size); }#endif m_size = size; m_head = m_tail = 0; if(name == NULL) { m_buffer = new T[m_size]; } else { int shm_fd = shm_open(name, O_CREAT | O_RDWR, 0666); if (shm_fd < 0) { perror("shm_open"); } if (ftruncate(shm_fd, m_size * sizeof(T)) < 0) { perror("ftruncate"); close(shm_fd); } void *addr = mmap(0, m_size * sizeof(T), PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0); if (addr == MAP_FAILED) { perror("mmap"); close(shm_fd); } if (close(shm_fd) == -1) { perror("close"); exit(1); } m_buffer = static_cast<T*>(addr); memcpy(shm_name, name, SHM_NAME_LEN - 1); }#ifdef USE_LOCK spinlock_init(m_lock);#endif } inline unsigned int roundup_pow_of_two(size_t size) { size |= size >> 1; size |= size >> 2; size |= size >> 4; size |= size >> 8; size |= size >> 16; size |= size >> 32; return size + 1; }protected: char shm_name[SHM_NAME_LEN]; volatile unsigned int m_head; volatile unsigned int m_tail; unsigned int m_size;#ifdef USE_LOCK spinlock_t m_spinLock;#endif T* m_buffer;};
#define USE_LOCK开启spinlock锁,多分娩者多消耗者场景#define USE_MB开启Memory Barrier#define USE_POT开启部队大小的2的幂对皆
LockFreeQueueTest.cpp测试文献:
#include "LockFreeQueue.hpp"#include <thread> //#define USE_LOCK class Test{public: Test(int id = 0, int value = 0) { this->id = id; this->value = value; sprintf(data, "id = %d, value = %d\n", this->id, this->value); } void display() { printf("%s", data); }private: int id; int value; char data[128];}; double getdetlatimeofday(struct timeval *begin, struct timeval *end){ return (end->tv_sec + end->tv_usec * 1.0 / 1000000) - (begin->tv_sec + begin->tv_usec * 1.0 / 1000000);} LockFreeQueue<Test> queue(1 << 10, "/shm"); #define N ((1 << 20)) void produce(){ struct timeval begin, end; gettimeofday(&begin, NULL); unsigned int i = 0; while(i < N) { if(queue.push(Test(i >> 10, i))) i++; } gettimeofday(&end, NULL); double tm = getdetlatimeofday(&begin, &end); printf("producer tid=%lu %f MB/s %f msg/s elapsed= %f size= %u\n", pthread_self(), N * sizeof(Test) * 1.0 / (tm * 1024 * 1024), N * 1.0 / tm, tm, i);} void consume(){ Test test; struct timeval begin, end; gettimeofday(&begin, NULL); unsigned int i = 0; while(i < N) { if(queue.pop(test)) { //test.display(); i++; } } gettimeofday(&end, NULL); double tm = getdetlatimeofday(&begin, &end); printf("consumer tid=%lu %f MB/s %f msg/s elapsed= %f size= %u\n", pthread_self(), N * sizeof(Test) * 1.0 / (tm * 1024 * 1024), N * 1.0 / tm, tm, i);} int main(int argc, char const *argv[]){ std::thread producer1(produce); //std::thread producer2(produce); std::thread consumer(consume); producer1.join(); //producer2.join(); consumer.join(); return 0;}
多线程场景下,需要界说USE_LOCK宏,开启锁保护。
编译:
g++ --std=c++11 -O3 LockFreeQueueTest.cpp -o test -lrt -pthread五、kfifo内核部队
计较机科学家照旧确认,当唯有一个读线程和一个写线程并发操作时,不需要任何额外的锁,就不错确保是线程安全的,也即kfifo使用了无锁编程本事,以提高kernel的并发。
Linux kernel内部从来就不穷困神圣,优雅和高效的代码,仅仅咱们穷困发现和试吃的目光。在Linux kernel内部,神圣并不暴露代码使用神出鬼没的超然妙技,违犯,它使用的不外是全球相配熟习的基础数据结构,但是kernel开发者能从基础的数据结构中,索取出优好意思的脾性。
kfifo即是这样的一类优好意思代码,它十分心圣,绝无迷漫的一瞥代码,却相配高效。
对于kfifo信息如下:
本文分析的原代码版块: 2.6.24.4kfifo的界说文献: kernel/kfifo.ckfifo的头文献: include/linux/kfifo.h
kfifo是Linux内核的一个FIFO数据结构,物联网软件开发公司罗致环形轮回部队的数据结构来竣事,提供一个恢弘界的字节流处事,何况使用并行无锁编程本事,即单分娩者单消耗者场景下两个线程不错并发操作,不需要任何加锁行径就不错保证kfifo线程安全。
kfifo代码既然肩负着这样多脾性,那咱们先一敝它的代码:
struct kfifo { unsigned char *buffer; /* the buffer holding the data */ unsigned int size; /* the size of the allocated buffer */ unsigned int in; /* data is added at offset (in % size) */ unsigned int out; /* data is extracted from off. (out % size) */ spinlock_t *lock; /* protects concurrent modifications */};
这是kfifo的数据结构,kfifo主要提供了两个操作,__kfifo_put(入队操作)和__kfifo_get(出队操作)。 它的各个数据成员如下:
buffer: 用于存放数据的缓存
size: buffer空间的大小,在初化时,将它朝上推广成2的幂(如5,朝上推广 与它最接近的值且是2的n次方的值是2^3,即8)
lock: 若是使用不成保证任何时辰最多唯有一个读线程和写线程,需要使用该lock实施同步。
in, out: 和buffer一皆组成一个轮回部队。 in指向buffer中队头,而且out指向buffer中的队尾
它的结构如示图如下:
+--------------------------------------------------------------+| |<----------data---------->| |+--------------------------------------------------------------+ ^ ^ ^ | | | out in size
天然,内核开发者使用了一种更好的本事处理了in, out和buffer的筹划,咱们将鄙人面进行详确分析。
5.1kfifo功能刻画kfifo提供如下对外功能规格
只因循一个读者和一个读者并发操作
无窒碍的读写操作,若是空间不够,则复返内容看望空间
(1)kfifo_alloc 分拨kfifo内存和开动化责任
struct kfifo *kfifo_alloc(unsigned int size, gfp_t gfp_mask, spinlock_t *lock){ unsigned char *buffer; struct kfifo *ret; /* * round up to the next power of 2, since our 'let the indices * wrap' tachnique works only in this case. */ if (size & (size - 1)) { BUG_ON(size > 0x80000000); size = roundup_pow_of_two(size); } buffer = kmalloc(size, gfp_mask); if (!buffer) return ERR_PTR(-ENOMEM); ret = kfifo_init(buffer, size, gfp_mask, lock); if (IS_ERR(ret)) kfree(buffer); return ret;}
这里值得一提的是,kfifo->size的值老是在调用者传进来的size参数的基础上向2的幂推广(roundup_pow_of_two,我我方的竣事在著作末尾),这是内核一贯的作念法。这样的平正显而易见——对kfifo->size取模运算不错更动为与运算,如下:
kfifo->in % kfifo->size 不错更动为 kfifo->in & (kfifo->size – 1)
在kfifo_alloc函数中,使用size & (size – 1)来判断size 是否为2幂,若是条款为真,则暴露size不是2的幂,然后调用roundup_pow_of_two将之朝上推广为2的幂。
这都是常用的妙技,只不外全球莫得将它们聚合起来使用资料,底下要分析的__kfifo_put和__kfifo_get则是将kfifo->size的特色发挥到了极致。
(2)__kfifo_put和__kfifo_get玄妙的入队和出队
__kfifo_put是入队操作,它先将数据放入buffer内部,临了才修改in参数;__kfifo_get是出队操作,它先将数据从buffer中移走,临了才修改out。(确保即使in和out修改失败,也不错再来一遍)
你会发现in和out两者各司其职。底下是__kfifo_put和__kfifo_get的代码
unsigned int __kfifo_put(struct kfifo *fifo, unsigned char *buffer, unsigned int len){ unsigned int l; len = min(len, fifo->size - fifo->in + fifo->out); /* * Ensure that we sample the fifo->out index -before- we * start putting bytes into the kfifo. */ smp_mb(); /* first put the data starting from fifo->in to buffer end */ l = min(len, fifo->size - (fifo->in & (fifo->size - 1))); memcpy(fifo->buffer + (fifo->in & (fifo->size - 1)), buffer, l); /* then put the rest (if any) at the beginning of the buffer */ memcpy(fifo->buffer, buffer + l, len - l); /* * Ensure that we add the bytes to the kfifo -before- * we update the fifo->in index. */ smp_wmb(); fifo->in += len; return len;}
奇怪吗?代码十足是线性结构,莫得任何if-else分支来判断是否有足够的空间存放数据。内核在这里的代码相配神圣,莫得一瞥迷漫的代码。
l = min(len, fifo->size - (fifo->in & (fifo->size - 1)));
这个抒发式计较现时写入的空间,换成东谈主可相识的话语即是:
l = kfifo可写空间和预期写入空间的最小值
(3)使用min宏来代if-else分支
__kfifo_get也应用了一样妙技,代码如下:
unsigned int __kfifo_get(struct kfifo *fifo, unsigned char *buffer, unsigned int len){ unsigned int l; len = min(len, fifo->in - fifo->out); /* * Ensure that we sample the fifo->in index -before- we * start removing bytes from the kfifo. */ smp_rmb(); /* first get the data from fifo->out until the end of the buffer */ l = min(len, fifo->size - (fifo->out & (fifo->size - 1))); memcpy(buffer, fifo->buffer + (fifo->out & (fifo->size - 1)), l); /* then get the rest (if any) from the beginning of the buffer */ memcpy(buffer + l, fifo->buffer, len - l); /* * Ensure that we remove the bytes from the kfifo -before- * we update the fifo->out index. */ smp_mb(); fifo->out += len; return len;}
珍藏读两遍吧,我也读了屡次,每次老是有新发现,因为in, out和size的筹划太玄妙了,居然能愚弄上unsigned int回绕的脾性。
原来,kfifo每次入队或出队,kfifo->in或kfifo->out仅仅简略地kfifo->in/kfifo->out += len,并莫得对kfifo->size 进行取模运算。因此kfifo->in和kfifo->out老是一直增大,直到unsigned in最大值时,又会绕回到0这一肇始端。但恒久舒服:
kfifo->in - kfifo->out <= kfifo->size即使kfifo->in回绕到了0的那一端,这个性质仍然是保抓的。
对于给定的kfifo:
数据空间长度为:kfifo->in - kfifo->out而剩余空间(可写入空间)长度为:kfifo->size - (kfifo->in - kfifo->out)
尽管kfifo->in和kfofo->out一直高出kfifo->size进行增长,但它对应在kfifo->buffer空间的下标却是如下:
kfifo->in % kfifo->size (i.e. kfifo->in & (kfifo->size - 1))kfifo->out % kfifo->size (i.e. kfifo->out & (kfifo->size - 1))
往kfifo内部写一块数据时,数据空间、写入空间和kfifo->size的筹划若是舒服:
kfifo->in % size + len > size
那就要作念写拆分了,见下图:
双色球第2024079期红球号码012路分析
kfifo_put(写)空间滥觞地址 | \_/ |XXXXXXXXXXXXXXXXXX| +--------------------------------------------------------------+| |<----------data---------->| |+--------------------------------------------------------------+ ^ ^ ^ | | | out%size in%size size ^ | 写空间收尾地址
第一块天然是: [kfifo->in % kfifo->size, kfifo->size]第二块天然是:[0, len - (kfifo->size - kfifo->in % kfifo->size)]
底下是代码,细细体味吧:
/* first put the data starting from fifo->in to buffer end */ l = min(len, fifo->size - (fifo->in & (fifo->size - 1))); memcpy(fifo->buffer + (fifo->in & (fifo->size - 1)), buffer, l); /* then put the rest (if any) at the beginning of the buffer */ memcpy(fifo->buffer, buffer + l, len - l);
对于kfifo_get过程,亦然雷同的,请列位自行分析。
(4)kfifo_get和kfifo_put无锁并发操作
计较机科学家照旧确认,当唯有一个读经程和一个写线程并发操作时,不需要任何额外的锁,就不错确保是线程安全的,也即kfifo使用了无锁编程本事,以提高kernel的并发。
kfifo使用in和out两个指针来刻画写入和读取游标,对于写入操作,只更新in指针,而读取操作,只更新out指针,可谓相收成彰,暴露图如下:
|<--写入-->|+--------------------------------------------------------------+| |<----------data----->| |+--------------------------------------------------------------+ |<--读取-->| ^ ^ ^ | | | out in size
为了幸免读者看到写者瞻望写入,但内容莫得写入数据的空间,写者必须保证以下的写入法例:
往[kfifo->in, kfifo->in + len]空间写入数据更新kfifo->in指针为 kfifo->in + len
在操作1完成时,读者是还莫得看到写入的信息的,因为kfifo->in莫得变化,合计读者还莫得滥觞写操作,唯有更新kfifo->in之后,读者智力看到。
那么如何保证1必须在2之前完成,神秘即是使用内存樊篱:smp_mb(),smp_rmb(), smp_wmb(),来保证对方不雅察到的内存操作法例。
5.2kfifo内核部队竣事kfifo数据结构界说如下:
struct kfifo{ unsigned char *buffer; unsigned int size; unsigned int in; unsigned int out; spinlock_t *lock;}; // 创建部队struct kfifo *kfifo_init(unsigned char *buffer, unsigned int size, gfp_t gfp_mask, spinlock_t *lock){ struct kfifo *fifo; // 判断是否为2的幂 BUG_ON(!is_power_of_2(size)); fifo = kmalloc(sizeof(struct kfifo), gfp_mask); if (!fifo) return ERR_PTR(-ENOMEM); fifo->buffer = buffer; fifo->size = size; fifo->in = fifo->out = 0; fifo->lock = lock; return fifo;} // 分拨空间struct kfifo *kfifo_alloc(unsigned int size, gfp_t gfp_mask, spinlock_t *lock){ unsigned char *buffer; struct kfifo *ret; // 判断是否为2的幂 if (!is_power_of_2(size)) { BUG_ON(size > 0x80000000); // 朝上推广成2的幂 size = roundup_pow_of_two(size); } buffer = kmalloc(size, gfp_mask); if (!buffer) return ERR_PTR(-ENOMEM); ret = kfifo_init(buffer, size, gfp_mask, lock); if (IS_ERR(ret)) kfree(buffer); return ret;} void kfifo_free(struct kfifo *fifo){ kfree(fifo->buffer); kfree(fifo);} // 入队操作static inline unsigned int kfifo_put(struct kfifo *fifo, const unsigned char *buffer, unsigned int len){ unsigned long flags; unsigned int ret; spin_lock_irqsave(fifo->lock, flags); ret = __kfifo_put(fifo, buffer, len); spin_unlock_irqrestore(fifo->lock, flags); return ret;} // 出队操作static inline unsigned int kfifo_get(struct kfifo *fifo, unsigned char *buffer, unsigned int len){ unsigned long flags; unsigned int ret; spin_lock_irqsave(fifo->lock, flags); ret = __kfifo_get(fifo, buffer, len); //当fifo->in == fifo->out时,buufer为空 if (fifo->in == fifo->out) fifo->in = fifo->out = 0; spin_unlock_irqrestore(fifo->lock, flags); return ret;} // 入队操作unsigned int __kfifo_put(struct kfifo *fifo, const unsigned char *buffer, unsigned int len){ unsigned int l; //buffer中空的长度 len = min(len, fifo->size - fifo->in + fifo->out); // 内存樊篱:smp_mb(),smp_rmb(), smp_wmb()来保证对方不雅察到的内存操作法例 smp_mb(); // 将数据追加到部队尾部 l = min(len, fifo->size - (fifo->in & (fifo->size - 1))); memcpy(fifo->buffer + (fifo->in & (fifo->size - 1)), buffer, l); memcpy(fifo->buffer, buffer + l, len - l); smp_wmb(); //每次累加,到达最大值后溢出,自动转为0 fifo->in += len; return len;} // 出队操作unsigned int __kfifo_get(struct kfifo *fifo, unsigned char *buffer, unsigned int len){ unsigned int l; //独特据的缓冲区的长度 len = min(len, fifo->in - fifo->out); smp_rmb(); l = min(len, fifo->size - (fifo->out & (fifo->size - 1))); memcpy(buffer, fifo->buffer + (fifo->out & (fifo->size - 1)), l); memcpy(buffer + l, fifo->buffer, len - l); smp_mb(); fifo->out += len; //每次累加,到达最大值后溢出,自动转为0 return len;} static inline void __kfifo_reset(struct kfifo *fifo){ fifo->in = fifo->out = 0;} static inline void kfifo_reset(struct kfifo *fifo){ unsigned long flags; spin_lock_irqsave(fifo->lock, flags); __kfifo_reset(fifo); spin_unlock_irqrestore(fifo->lock, flags);} static inline unsigned int __kfifo_len(struct kfifo *fifo){ return fifo->in - fifo->out;} static inline unsigned int kfifo_len(struct kfifo *fifo){ unsigned long flags; unsigned int ret; spin_lock_irqsave(fifo->lock, flags); ret = __kfifo_len(fifo); spin_unlock_irqrestore(fifo->lock, flags); return ret;}5.3kfifo打算重点
(1)保证buffer size为2的幂
kfifo->size值在调用者传递参数size的基础上向2的幂推广,地方是使kfifo->size取模运算不错更动为位与运算(提高运行成果)。kfifo->in % kfifo->size更动为 kfifo->in & (kfifo->size – 1)
保证size是2的幂不错通过位运算的表情求余,在频繁操作部队的情况下不错大大提高成果。
(2)使用spin_lock_irqsave与spin_unlock_irqrestore 竣事同步。
Linux内核中有spin_lock、spin_lock_irq和spin_lock_irqsave保证同步。
static inline void __raw_spin_lock(raw_spinlock_t *lock){ preempt_disable(); spin_acquire(&lock->dep_map, 0, 0, _RET_IP_); LOCK_CONTENDED(lock, do_raw_spin_trylock, do_raw_spin_lock);} static inline void __raw_spin_lock_irq(raw_spinlock_t *lock){ local_irq_disable(); preempt_disable(); spin_acquire(&lock->dep_map, 0, 0, _RET_IP_); LOCK_CONTENDED(lock, do_raw_spin_trylock, do_raw_spin_lock);}
spin_lock比spin_lock_irq速率快,但并不是线程安全的。spin_lock_irq加多调用local_irq_disable函数,即退却腹地中断,是线程安全的,既退却腹地中断,又退却内核霸占。
spin_lock_irqsave是基于spin_lock_irq竣事的一个援手接口,在参加和离开临界区后,不会转换中断的开启、关闭景色。
若是自旋锁在中断处理函数中被用到,在得回自旋锁前需要关闭腹地中断,spin_lock_irqsave竣事如下:
A、保存腹地中断景色;
B、关闭腹地中断;
C、得回自旋锁。
解锁时通过 spin_unlock_irqrestore完成开释锁、收复腹地中断到原来景色等责任。
(3)线性代码结构
代码中莫得任何if-else分支来判断是否有足够的空间存放数据,kfifo每次入队或出队仅仅简略的 +len 判断剩余空间,并莫得对kfifo->size 进行取模运算,是以kfifo->in和kfifo->out老是一直增大,直到unsigned in高出最大值时绕回到0这一肇始端,但恒久舒服:kfifo->in - kfifo->out <= kfifo->size。
(4)使用Memory Barrier
mb():适用于多处理器和单处理器的内存樊篱。
rmb():适用于多处理器和单处理器的读内存樊篱。
wmb():适用于多处理器和单处理器的写内存樊篱。
smp_mb():适用于多处理器的内存樊篱。
smp_rmb():适用于多处理器的读内存樊篱。
smp_wmb():适用于多处理器的写内存樊篱。
Memory Barrier使用场景如下:
A、竣事同步原语(synchronization primitives)
B、竣事无锁数据结构(lock-free data structures)
小程序开发C、驱动门径
门径在运行时内存内容看望法例和门径代码编写的看望法例不一定一致,即内存乱序看望。内存乱序看望行径出现是为了栽种门径运行时的性能。内存乱序看望主要发生在两个阶段:
A、编译时,编译器优化导致内存乱序看望(提示重排)。
B、运行时,多CPU间交互引起内存乱序看望。
Memory Barrier大略让CPU或编译器在内存看望上有序。Memory barrier前的内存看望操作必定先于自后的完成。Memory Barrier包括两类:
A、编译器Memory Barrier。
B、CPU Memory Barrier。
平凡,编译器和CPU引起内存乱序看望不会带来问题,但若是门径逻辑的正确性依赖于内存看望法例,内存乱序看望会带来逻辑上的极度。
在编译时,编译器对代码作念出优化时可能转换内容本质提示的法例(如GCC的O2或O3都会转换内容本质提示的法例)。
在运行时,CPU固然会乱序本质提示物联网软件开发公司,但在单个CPU上,硬件大略保证门径本质时所有的内存看望操作都是按门径代码编写的法例本质的,Memory Barrier莫得必要使用(不酌量编译器优化)。为了更快本质提示,CPU采用活水线的本质表情,编译器在编译代码时为了使提示更适当CPU的活水线本质表情以及多CPU本质,底本提示就会出现乱序的情况。在乱序本质时,CPU简直本质提示的法例由可用的输入数据决定,而非门径员编写的法例。
本站仅提供存储处事,所有内容均由用户发布,如发现存害或侵权内容,请点击举报。