muduo日志库分析
- 异步日志机制
- 双缓存机制
- 前台日志写入栈
- 后台日志(落盘)写入栈
- 使用示例
- 总结
- 后言
异步日志机制
通过notify和超时方式唤醒日志落盘线程读取日志写入磁盘。
多线程间使用mutex互斥保证线程安全。
日志写入磁盘时采用批量写入方式。
注意:队列不是每一行日志,而是buffer缓冲区(比如4M)。
双缓存机制
日志写入过程(假设buffer为4M):
(1)加锁,判断当前的buffer是否超过4M。
(2)如果没有超过4M,把日志写入buffer;如果超出4M则把当前的buffer插入到队列中。此时,当前日志写到一个新的buffer(循环复用的buffer)中。
日志notify问题:
(1)写满1个buffer才发一次notify唤醒日志落盘。
(2)超时通过wait_timeout唤醒日志落盘线程,buffer只要有数据就写入到磁盘。
双缓冲机制中循环使用buffer,避免buffer不断分配。
void AsyncLogging::append(const char* logline, int len)
{// if(cnt++ == 50000)abort();MutexLockGuard lock(mutex_); // 多线程加锁if (currentBuffer_->avail() > len) // 判断buffer还有没有空间写入这条日志{currentBuffer_->append(logline, len); // 直接写入}else{buffers_.push_back(std::move(currentBuffer_)); // buffers_是vector,把buffer入队列// printf("push_back append_cnt:%d, size:%d\n", ++append_cnt, buffers_.size());if (nextBuffer_) // 用了双缓存{currentBuffer_ = std::move(nextBuffer_); // 如果不为空则将buffer转移到currentBuffer_}else{// 重新分配buffercurrentBuffer_.reset(new Buffer); // Rarely happens如果后端写入线程没有及时读取数据,那要再分配buffer}currentBuffer_->append(logline, len); // buffer写满了cond_.notify(); // 唤醒写入线程}
}void AsyncLogging::threadFunc()
{assert(running_ == true);latch_.countDown();LogFile output(basename_, rollSize_, false);BufferPtr newBuffer1(new Buffer); // 是给currentBuffer_BufferPtr newBuffer2(new Buffer); // 是给nextBuffer_newBuffer1->bzero();newBuffer2->bzero();BufferVector buffersToWrite; // 保存要写入的日志buffersToWrite.reserve(16);while (running_){assert(newBuffer1 && newBuffer1->length() == 0);assert(newBuffer2 && newBuffer2->length() == 0);assert(buffersToWrite.empty());{ // 锁的作用域MutexLockGuard lock(mutex_);if (buffers_.empty()) // 没有数据可读取,休眠{// printf("waitForSeconds into\n");cond_.waitForSeconds(flushInterval_); // 超时退出或者被唤醒(收到notify)// printf("waitForSeconds leave\n");}buffers_.push_back(std::move(currentBuffer_)); // currentBuffer_被锁住 currentBuffer_被置空// printf("push_back threadFunc:%d, size:%d\n", ++threadFunc_cnt, buffers_.size());currentBuffer_ = std::move(newBuffer1); // currentBuffer_ 需要内存空间buffersToWrite.swap(buffers_); // 用了双队列,把前端日志的队列所有buffer都转移到buffersToWrite队列if (!nextBuffer_) // newBuffer2是给nextBuffer_{nextBuffer_ = std::move(newBuffer2); // 如果为空则使用newBuffer2的缓存空间}}// 从这里是没有锁,数据落盘的时候不要加锁assert(!buffersToWrite.empty());// fixme的操作 4M一个buffer *25 = 100Mif (buffersToWrite.size() > 25) // 这里缓存的数据太多了,比如4M为一个buffer空间,25个buffer就是100M了。{printf("Dropped\n");char buf[256];snprintf(buf, sizeof buf, "Dropped log messages at %s, %zd larger buffers\n",Timestamp::now().toFormattedString().c_str(),buffersToWrite.size()-2); // 只保留2个bufferfputs(buf, stderr);output.append(buf, static_cast<int>(strlen(buf)));buffersToWrite.erase(buffersToWrite.begin()+2, buffersToWrite.end()); // 只保留2个buffer(默认4M)}for (const auto& buffer : buffersToWrite) // 遍历buffer{// FIXME: use unbuffered stdio FILE ? or use ::writev ?output.append(buffer->data(), buffer->length()); // 负责fwrite数据}output.flush(); // 保证数据落到磁盘了if (buffersToWrite.size() > 2){// drop non-bzero-ed buffers, avoid trashingbuffersToWrite.resize(2); // 只保留2个buffer}if (!newBuffer1){assert(!buffersToWrite.empty());newBuffer1 = std::move(buffersToWrite.back()); // 复用buffer对象buffersToWrite.pop_back();newBuffer1->reset(); // 重置}if (!newBuffer2){assert(!buffersToWrite.empty());newBuffer2 = std::move(buffersToWrite.back()); // 复用buffer对象buffersToWrite.pop_back();newBuffer2->reset(); // 重置}buffersToWrite.clear(); }output.flush();
}
前台日志写入栈
流程图:
具体实现流程:
后台日志(落盘)写入栈
流程图:
for (const auto& buffer : buffersToWrite) // 遍历buffer
{// FIXME: use unbuffered stdio FILE ? or use ::writev ?output.append(buffer->data(), buffer->length()); // 负责fwrite数据
}
output.flush(); // 保证数据落到磁盘了
使用示例
#include "AsyncLogging.h"#include <stdio.h>
#include <sys/resource.h>
#include <unistd.h>
#include <sys/time.h>
#include <iostream>
#include "Logging.h"
#define LOG_NUM 5000000 // 总共的写入日志行数using namespace std;off_t kRollSize= 1 * 1000 * 1000; // 只设置1Mstatic AsyncLogging *g_asyncLog = NULL;
static void asyncOutput(const char *msg, int len)
{g_asyncLog->append(msg, len);
}// 时间戳
static uint64_t get_tick_count()
{struct timeval tval;uint64_t ret_tick;gettimeofday(&tval, NULL);ret_tick = tval.tv_sec * 1000L + tval.tv_usec / 1000L;return ret_tick;
}int main(int argc,char*argv[])
{printf("PID = %d\n",getpid());char name[260] = { 0 };strncpy(name, argv[0], sizeof name - 1);// 设置 回滚大小kRollSize(1M), 最大1秒刷一次盘(flush)AsyncLogging log(::basename(name), kRollSize, 1);Logger::setOutput(asyncOutput);g_asyncLog = &log;// 启动日志写入线程log.start();uint64_t begin_time = get_tick_count();cout << "name: " << basename(name) << "\nbegin time: " << begin_time << endl;for (int i = 0; i < LOG_NUM; i++){LOG_INFO << "NO." << i << " Root Error Message!"; // 47个字节}log.stop();uint64_t end_time = get_tick_count();std::cout << "end_time: " << end_time << std::endl;int64_t ops = LOG_NUM;ops = ops * 1000 / (end_time - begin_time);std::cout << "need the time1: " << end_time << " " << begin_time << ", " << end_time - begin_time << "毫秒"<< ", ops = " << ops << "ops/s\n";return 0;
}
总结
(1)日志可以采用批量写入(以数据大小为判断为准)来做到高性能。
同步方式通过攒够数据(比如4M)或者时间超过一定阈值(比如1秒)触发写入。比如glog日志库。
异步方式(比如moduo日志库)采用append积攒数据,异步落盘线程负责数据写入磁盘。
什么时候触发? ------> notify+wait_timeout,即 通知唤醒+超时唤醒。
(2)为减少锁的粒度,减少刷新磁盘的时候日志接口阻塞,采用双队列方式(前台队列+后台刷新磁盘队列,后台队列刷新数据到磁盘)。
(3)内存分配通过move语义避免深拷贝。
(4)log4cpp的日志框架值得参考,但是它的性能不佳,要自己做完善、扩展。
后言
本专栏知识点是通过<零声教育>的系统学习,进行梳理总结写下文章,对c/c++linux系统提升感兴趣的读者,可以点击链接,详细查看详细的服务:C/C++服务器课程