glog另启动线程写文本日志
glog本身是非常高效的,google的大牛肯定知道大规模的写日志用glog的话肯定会影响业务线程的处理,带负荷的磁盘IO谁都桑不起。比如levelDB就是默认异步写,更不用说google的三驾马车都是分布式的。之前看过其论文,简直是引领时代。
在glog的issue里有人提出了异步写的问题,但是语焉不详,不过0.33版本已经有了接口,但是还不友好,但是完全可以实现磁盘日志的异步写。
今天算是花了点时间踩了点坑,算是基本可以搞了。稳定之后会把这个版本和glog,g2log,mudo logging一起测试下。mudo对buffer做了些trick,内部有两个bufferptr,做了双缓冲,据说效率很高,不过只有linux平台的,不过但把它的log抽离出来也不难,陈老师封装了mutex,thread,conditional等,在gcc4.8,clang3.3,VS2010都不是问题,已经没多大必要,而且之前为之乐道的linux下的threadsafe的initonce,现在C++11x也有了支持。
glog中可以让client定制接口是:
class GOOGLE_GLOG_DLL_DECL Logger { public: virtual ~Logger(); // Writes "message[0,message_len-1]" corresponding to an event that // occurred at "timestamp". If "force_flush" is true, the log file // is flushed immediately. // // The input message has already been formatted as deemed // appropriate by the higher level logging facility. For example, // textual log messages already contain timestamps, and the // file:linenumber header. virtual void Write(bool force_flush, time_t timestamp, const char* message, int message_len) = 0; // Flush any buffered messages virtual void Flush() = 0; // Get the current LOG file size. // The returned value is approximate since some // logged data may not have been flushed to disk yet. virtual uint32 LogSize() = 0; virtual void SetBasename(const char* basename) = 0; virtual void SetExtension(const char* ext) = 0 ; virtual void SetSymlinkBasename(const char* symlink_basename) = 0; };
我在里面另外加了几个接口,为了之后的方便。
用Active object模式很好解决,就是我们通常所说的生产者消费者,在logmsg析构时就会fflush到磁盘,这次就会调用logger的write方法,此时就是我们接手的机会,把数据封装下,投递到业务线程,然后取出,实际写磁盘就好。
封装了简单的Active模式,Activer里封装了LogData用来封装打印实体,Buffer用来线程间传递数据,另外要显式设置Active的回调函数callBack.线程间传递数据用了C++11里的currentQueue,就不需要自己造轮子了:
/** ========================================================================== * 2010 by KjellKod.cc. This is PUBLIC DOMAIN to use at your own risk and comes * with no warranties. This code is yours to share, use and modify with no * strings attached and no restrictions or obligations. * ============================================================================ * * Example of a Active Object, using C++11 std::thread mechanisms to make it * safe for thread communication. * * This was originally published at http://sites.google.com/site/kjellhedstrom2/active-object-with-cpp0x * and inspired from Herb Sutter‘s C++11 Active Object * http://herbsutter.com/2010/07/12/effective-concurrency-prefer-using-active-objects-instead-of-naked-threads * * The code below uses JustSoftware Solutions Inc std::thread implementation * http://www.justsoftwaresolutions.co.uk * * Last update 2012-10-10, by Kjell Hedstrom, * e-mail: hedstrom at kjellkod dot cc * linkedin: http://linkedin.com/se/kjellkod */ #ifndef ACTIVE_H_ #define ACTIVE_H_ #include <thread> #include <functional> #include <condition_variable> #include <mutex> #include <memory> #include <concurrent_queue.h> #include "shared_queue.h" struct Buffer { Buffer():m_Len(0), m_pMsg(NULL){} ~Buffer() { if (NULL != m_pMsg) delete []m_pMsg; } Buffer(int size):m_Len(size) , m_pMsg(new char[m_Len]) { } int m_Len; char* m_pMsg; }; typedef std::function<void(Buffer*)> Callback; class Active { private: Active(const Active&); // c++11 feature not yet in vs2010 = delete; Active& operator=(const Active&); // c++11 feature not yet in vs2010 = delete; Active(); // Construction ONLY through factory createActive(); void doDone(){done_ = true;} void run(); void setCallBack(Callback aCallBack); Concurrency::concurrent_queue<Buffer*> mq_; std::thread thd_; bool done_; // finished flag to be set through msg queue by ~Active Callback callBack_; public: virtual ~Active(); void send(Buffer* apBuffer); static std::unique_ptr<Active> createActive(Callback aCallBack); // Factory: safe construction & thread start }; #endif /** ========================================================================== * 2010 by KjellKod.cc. This is PUBLIC DOMAIN to use at your own risk and comes * with no warranties. This code is yours to share, use and modify with no * strings attached and no restrictions or obligations. * ============================================================================ * * Example of a Active Object, using C++11 std::thread mechanisms to make it * safe for thread communication. * * This was originally published at http://sites.google.com/site/kjellhedstrom2/active-object-with-cpp0x * and inspired from Herb Sutter‘s C++11 Active Object * http://herbsutter.com/2010/07/12/effective-concurrency-prefer-using-active-objects-instead-of-naked-threads * * The code below uses JustSoftware Solutions Inc std::thread implementation * http://www.justsoftwaresolutions.co.uk * * Last update 2012-10-10, by Kjell Hedstrom, * e-mail: hedstrom at kjellkod dot cc * linkedin: http://linkedin.com/se/kjellkod */ #include "active.h" #include <cassert> Active::Active(): done_(false){} Active::~Active() { Callback quit_token = std::bind(&Active::doDone, this); thd_.join(); } // Add asynchronously a work-message to queue void Active::send( Buffer* apBuffer ) { if (NULL != apBuffer) { mq_.push(apBuffer); } } void Active::run() { while (!done_) { if (!mq_.empty()) { Buffer* pBuffer = NULL; mq_.try_pop(pBuffer); if (NULL != pBuffer) { callBack_(pBuffer); delete pBuffer; } } } } // Factory: safe construction of object before thread start std::unique_ptr<Active> Active::createActive(Callback aCallBack){ std::unique_ptr<Active> aPtr(new Active()); aPtr->thd_ = std::thread(&Active::run, aPtr.get()); aPtr->callBack_ = aCallBack; return aPtr; } void Active::setCallBack( Callback aCallBack ) { callBack_ = aCallBack; }
重点是在threadlogger里,实现了Logger的接口。Write函数实现真正的写逻辑,几个set函数会在内部被调用。
#pragma once #include <glog/logging.h> #include <mutex> #include "active.h" using namespace std; namespace google { class ThreadLog : public google::base::Logger { public: ThreadLog(); ~ThreadLog(); virtual void Write(bool force_flush, time_t timestamp, const char* message, int message_len) ; virtual void Flush(); virtual uint32 LogSize(); // Configuration options void SetBasename(const char* basename); void SetExtension(const char* ext); void SetSymlinkBasename(const char* symlink_basename); void CallBack(Buffer* pBuffer); private: static const uint32 kRolloverAttemptFrequency = 0x20; mutex lock_; bool base_filename_selected_; string base_filename_; string symlink_basename_; string filename_extension_; // option users can specify (eg to add port#) FILE* file_; LogSeverity severity_; uint32 bytes_since_flush_; uint32 file_length_; unsigned int rollover_attempt_; int64 next_flush_time_; // cycle count at which to flush log string hostname; bool stopWriting; std::unique_ptr<Active> m_pActive; bool CreateLogfile(const string& time_pid_string); void FlushUnlocked(); void WriteInteral(bool force_flush, time_t timestamp, const char* message, int message_len); }; } #include "ThreadLog.h" #include "port.h" #include <fcntl.h> #include <iomanip> #include "utilities.h" #include <functional> namespace google { static int GetSize(bool& force_flush, time_t& timestamp, const char* message, int& message_len) { return sizeof(force_flush)+sizeof(timestamp)+sizeof(message_len)+message_len; } void ThreadLog::Write( bool force_flush, time_t timestamp, const char* message, int message_len ) { Buffer* pBuffer = new Buffer(GetSize(force_flush, timestamp, message, message_len)); char* curData = pBuffer->m_pMsg; memcpy(curData, &force_flush, sizeof(force_flush)); curData += sizeof(force_flush); memcpy(curData, ×tamp, sizeof(timestamp)); curData += sizeof(timestamp); memcpy(curData, &message_len, sizeof(message_len)); curData += sizeof(message_len); memcpy(curData, message, message_len); curData += message_len; m_pActive->send(pBuffer); } void ThreadLog::Flush() { } google::uint32 ThreadLog::LogSize() { return 0; } void ThreadLog::SetBasename( const char* basename ) { std::lock_guard<std::mutex> lock(lock_); base_filename_selected_ = true; if (base_filename_ != basename) { if (file_ != NULL) { fclose(file_); file_ = NULL; rollover_attempt_ = kRolloverAttemptFrequency-1; } base_filename_ = basename; } } void ThreadLog::SetExtension( const char* ext ) { std::lock_guard<std::mutex> lock(lock_); if (filename_extension_ != ext) { // Get rid of old log file since we are changing names if (file_ != NULL) { fclose(file_); file_ = NULL; rollover_attempt_ = kRolloverAttemptFrequency-1; } filename_extension_ = ext; } } void ThreadLog::SetSymlinkBasename( const char* symlink_basename ) { std::lock_guard<std::mutex> lock(lock_); symlink_basename_ = symlink_basename; } bool ThreadLog::CreateLogfile( const string& time_pid_string ) { string string_filename = base_filename_+filename_extension_+ time_pid_string; const char* filename = string_filename.c_str(); int fd = open(filename, O_WRONLY | O_CREAT | O_EXCL, 0664); if (fd == -1) return false; #ifdef HAVE_FCNTL // Mark the file close-on-exec. We don‘t really care if this fails fcntl(fd, F_SETFD, FD_CLOEXEC); #endif file_ = fdopen(fd, "a"); // Make a FILE*. if (file_ == NULL) { // Man, we‘re screwed! close(fd); unlink(filename); // Erase the half-baked evidence: an unusable log file return false; } if (!symlink_basename_.empty()) { // take directory from filename const char* slash = strrchr(filename, ‘/‘); const string linkname = symlink_basename_ + ‘.‘ + LogSeverityNames[severity_]; string linkpath; if ( slash ) linkpath = string(filename, slash-filename+1); // get dirname linkpath += linkname; unlink(linkpath.c_str()); // delete old one if it exists // We must have unistd.h. #ifdef HAVE_UNISTD_H // Make the symlink be relative (in the same dir) so that if the // entire log directory gets relocated the link is still valid. const char *linkdest = slash ? (slash + 1) : filename; if (symlink(linkdest, linkpath.c_str()) != 0) { // silently ignore failures } // Make an additional link to the log file in a place specified by // FLAGS_log_link, if indicated if (!FLAGS_log_link.empty()) { linkpath = FLAGS_log_link + "/" + linkname; unlink(linkpath.c_str()); // delete old one if it exists if (symlink(filename, linkpath.c_str()) != 0) { // silently ignore failures } } #endif } return true; // Everything worked } void ThreadLog::FlushUnlocked() { if (file_ != NULL) { fflush(file_); bytes_since_flush_ = 0; } const int64 next = (FLAGS_logbufsecs * static_cast<int64>(1000000)); // in usec next_flush_time_ = CycleClock_Now() + UsecToCycles(next); } ThreadLog::ThreadLog(): file_(NULL) , bytes_since_flush_(0) , file_length_(0) , rollover_attempt_(0) , next_flush_time_(0) , stopWriting(false) , m_pActive(Active::createActive(std::bind(&ThreadLog::CallBack, this, std::placeholders::_1))) { } ThreadLog::~ThreadLog() { } void ThreadLog::WriteInteral( bool force_flush, time_t timestamp, const char* message, int message_len ) { if (base_filename_selected_ && base_filename_.empty()) { return; } if (static_cast<int>(file_length_ >> 20) >= MaxLogSize()) { if (file_ != NULL) fclose(file_); file_ = NULL; file_length_ = bytes_since_flush_ = 0; rollover_attempt_ = kRolloverAttemptFrequency-1; } if (file_ == NULL) { //if (++rollover_attempt_ != kRolloverAttemptFrequency) // return; //rollover_attempt_ = 0; struct ::tm tm_time; localtime_r(×tamp, &tm_time); ostringstream time_pid_stream; time_pid_stream.fill(‘0‘); time_pid_stream << 1900+tm_time.tm_year << setw(2) << 1+tm_time.tm_mon << setw(2) << tm_time.tm_mday << ‘-‘ << setw(2) << tm_time.tm_hour << setw(2) << tm_time.tm_min << setw(2) << tm_time.tm_sec << ‘.‘ << GetCurrentThreadId(); const string& time_pid_string = time_pid_stream.str(); if (base_filename_selected_) { if (!CreateLogfile(time_pid_string)) { perror("Could not create log file"); fprintf(stderr, "COULD NOT CREATE LOGFILE ‘%s‘!\n", time_pid_string.c_str()); return; } } else { string stripped_filename(glog_internal_namespace_::ProgramInvocationShortName()); GetHostName(&hostname); string uidname = MyUserName(); if (uidname.empty()) uidname = "invalid-user"; stripped_filename = stripped_filename+‘.‘+hostname+‘.‘+uidname+".log."+LogSeverityNames[severity_]+‘.‘; const vector<string> & log_dirs = GetLoggingDirectories(); bool success = false; for (vector<string>::const_iterator dir = log_dirs.begin();dir != log_dirs.end(); ++dir) { base_filename_ = *dir + "/" + stripped_filename; if ( CreateLogfile(time_pid_string) ) { success = true; break; } } if ( success == false ) { perror("Could not create logging file"); fprintf(stderr, "COULD NOT CREATE A LOGGINGFILE %s!", time_pid_string.c_str()); return; } } ostringstream file_header_stream; file_header_stream.fill(‘0‘); file_header_stream << "Log file created at: " << 1900+tm_time.tm_year << ‘/‘ << setw(2) << 1+tm_time.tm_mon << ‘/‘ << setw(2) << tm_time.tm_mday << ‘ ‘ << setw(2) << tm_time.tm_hour << ‘:‘ << setw(2) << tm_time.tm_min << ‘:‘ << setw(2) << tm_time.tm_sec << ‘\n‘ << "Running on machine: " << hostname << ‘\n‘ << "Log line format: [IWEF]mmdd hh:mm:ss.uuuuuu " << "threadid file:line] msg" << ‘\n‘; const string& file_header_string = file_header_stream.str(); const int header_len = file_header_string.size(); fwrite(file_header_string.data(), 1, header_len, file_); file_length_ += header_len; bytes_since_flush_ += header_len; } if ( !stopWriting ) { errno = 0; fwrite(message, 1, message_len, file_); if ( FLAGS_stop_logging_if_full_disk && errno == ENOSPC ) { // disk full, stop writing to disk stopWriting = true; // until the disk is return; } else { file_length_ += message_len; bytes_since_flush_ += message_len; } } else { if ( CycleClock_Now() >= next_flush_time_ ) stopWriting = true; // check to see if disk has free space. } if ( force_flush || (bytes_since_flush_ >= 1000000) || (CycleClock_Now() >= next_flush_time_) ) { FlushUnlocked(); #ifdef OS_LINUX if (FLAGS_drop_log_memory) { if (file_length_ >= logging::kPageSize) { // don‘t evict the most recent page uint32 len = file_length_ & ~(logging::kPageSize - 1); posix_fadvise(fileno(file_), 0, len, POSIX_FADV_DONTNEED); } } #endif } } void ThreadLog::CallBack( Buffer* pBuffer ) { char* curData = pBuffer->m_pMsg; bool force_flush = *(bool*)curData; curData += sizeof(force_flush); time_t timestamp = *(time_t*)curData; curData += sizeof(timestamp); int message_len = *(int*)curData; curData += sizeof(message_len); char* message = curData; WriteInteral(force_flush, timestamp, message, message_len); } }
这样搞定之后,main函数可以这样使用,就可以把自己的ThreadLog类内嵌到glog里。
#define GLOG_NO_ABBREVIATED_SEVERITIES #include <windows.h> #include <glog/logging.h> #include "ThreadLog.h" using namespace google; int main(int argc, char* argv[]) { google::InitGoogleLogging("test/testsss"); google::base::Logger* mylogger = new google::ThreadLog; SetLogger(google::GLOG_INFO, mylogger); google::SetLogDestination(google::GLOG_INFO, "../Debug/logtestInfo"); //google::SetLogDestination(google::GLOG_ERROR, "../Debug/logtestDebug"); int num_cookies = 0; google::SetStderrLogging(google::GLOG_INFO); //google::SetStderrLogging(google::GLOG_ERROR); //google::LogToStderr(); for (int i = 0; i < 1000; ++i){ LOG(INFO) << "how are " << i << " cookies"; } google::ShutdownGoogleLogging(); }
当然直接用这源码是无法编译成功的,我修改了glog内部的源码。
等下会把github工程地址:https://github.com/boyxiaolong/Proejcts/tree/master/asyn_glog-0.3.3
测试还有点问题,偶尔会有乱码,而且需要优化的是那个Buffer的动态申请。
不过都是后话了。