Skip to content

Commit

Permalink
Cherry pick: [FLASH-1126] Support search log (#679)
Browse files Browse the repository at this point in the history
  • Loading branch information
leiysky committed Apr 29, 2020
1 parent 96e9d04 commit 3684935
Show file tree
Hide file tree
Showing 8 changed files with 512 additions and 86 deletions.
76 changes: 68 additions & 8 deletions dbms/src/Flash/DiagnosticsService.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include <Common/Exception.h>
#include <Flash/DiagnosticsService.h>
#include <Flash/LogSearch.h>

#include <Poco/File.h>
#include <Poco/Path.h>
Expand All @@ -21,6 +22,8 @@
namespace DB
{

using diagnosticspb::LogLevel;
using diagnosticspb::SearchLogResponse;
using diagnosticspb::ServerInfoItem;
using diagnosticspb::ServerInfoPair;
using diagnosticspb::ServerInfoResponse;
Expand Down Expand Up @@ -817,14 +820,71 @@ catch (const Exception & e)
return grpc::Status(grpc::StatusCode::INTERNAL, "internal error");
}

::grpc::Status DiagnosticsService::search_log(::grpc::ServerContext * context, const ::diagnosticspb::SearchLogRequest * request,
::grpc::ServerWriter<::diagnosticspb::SearchLogResponse> * writer)
::grpc::Status DiagnosticsService::search_log(::grpc::ServerContext * grpc_context, const ::diagnosticspb::SearchLogRequest * request,
::grpc::ServerWriter<::diagnosticspb::SearchLogResponse> * stream)
{
(void)context;
(void)request;
(void)writer;
/// TODO: implement this
return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "unimplemented");
(void)grpc_context;

/// TODO: add error log
Poco::File log_file(Poco::Path(server.config().getString("logger.log")));
if (!log_file.exists())
{
LOG_ERROR(log, "Invalid log path: " << log_file.path());
return ::grpc::Status(grpc::StatusCode::INTERNAL, "internal error");
}

int64_t start_time = request->start_time();
int64_t end_time = request->end_time();
if (end_time == 0)
{
// default to now
end_time = std::chrono::milliseconds(std::time(NULL)).count();
}
std::vector<LogLevel> levels;
for (auto level : request->levels())
{
levels.push_back(static_cast<LogLevel>(level));
}

std::vector<std::string> patterns;
for (auto pattern : request->patterns())
{
patterns.push_back(pattern);
}

auto in_ptr = std::shared_ptr<std::ifstream>(new std::ifstream(log_file.path()));

LogIterator log_itr(start_time, end_time, levels, patterns, in_ptr);

static constexpr size_t LOG_BATCH_SIZE = 256;

LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": Handling SearchLog: " << request->DebugString());
for (;;)
{
size_t i = 0;
auto resp = SearchLogResponse::default_instance();
while (auto log_msg = log_itr.next())
{
i++;
auto added_msg = resp.add_messages();
*added_msg = *log_msg;

if (i == LOG_BATCH_SIZE - 1)
break;
}

if (i == 0)
break;

if (!stream->Write(resp))
{
LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": Write response failed for unknown reason.");
return grpc::Status(grpc::StatusCode::UNKNOWN, "Write response failed for unknown reason.");
}
}
LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": Handling SearchLog done: " << request->DebugString());

return ::grpc::Status::OK;
}

} // namespace DB
} // namespace DB
13 changes: 9 additions & 4 deletions dbms/src/Flash/DiagnosticsService.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#pragma once

#include <Server/IServer.h>

#include <common/logger_useful.h>
#include <boost/noncopyable.hpp>

Expand All @@ -17,14 +19,14 @@ class DiagnosticsService final : public ::diagnosticspb::Diagnostics::Service,
private boost::noncopyable
{
public:
DiagnosticsService() : log(&Logger::get("DiagnosticsService")) {}
DiagnosticsService(IServer & _server) : log(&Logger::get("DiagnosticsService")), server(_server) {}
~DiagnosticsService() override {}

public:
::grpc::Status search_log(::grpc::ServerContext * context, const ::diagnosticspb::SearchLogRequest * request,
::grpc::ServerWriter<::diagnosticspb::SearchLogResponse> * writer) override;
::grpc::Status search_log(::grpc::ServerContext * grpc_context, const ::diagnosticspb::SearchLogRequest * request,
::grpc::ServerWriter<::diagnosticspb::SearchLogResponse> * stream) override;

::grpc::Status server_info(::grpc::ServerContext * context, const ::diagnosticspb::ServerInfoRequest * request,
::grpc::Status server_info(::grpc::ServerContext * grpc_context, const ::diagnosticspb::ServerInfoRequest * request,
::diagnosticspb::ServerInfoResponse * response) override;

public:
Expand Down Expand Up @@ -130,7 +132,10 @@ class DiagnosticsService final : public ::diagnosticspb::Diagnostics::Service,
void systemInfo(std::vector<diagnosticspb::ServerInfoItem> & server_info_items);
void processInfo(std::vector<diagnosticspb::ServerInfoItem> & server_info_items);

private:
Poco::Logger * log;

IServer & server;
};

} // namespace DB
194 changes: 194 additions & 0 deletions dbms/src/Flash/LogSearch.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
#include <regex>

#include <Flash/LogSearch.h>
#include <boost/algorithm/string.hpp>
#include <boost/algorithm/string/predicate.hpp>

namespace DB
{
using ::diagnosticspb::LogLevel;
using ::diagnosticspb::LogMessage;

std::optional<LogMessage> LogIterator::next()
{
for (;;)
{
auto result = readLog();
if (auto err = std::get_if<Error>(&result); err)
{
if (err->tp != Error::Type::EOI)
{
LOG_ERROR(log, "readLog error: " << err->extra_msg);
}
return {};
}

auto entry = std::get<LogEntry>(result);
LogMessage msg;
msg.set_time(entry.time);
msg.set_message(entry.message);
LogLevel level;
switch (entry.level)
{
case LogEntry::Level::Trace:
level = LogLevel::Trace;
break;
case LogEntry::Level::Debug:
level = LogLevel::Debug;
break;
case LogEntry::Level::Info:
level = LogLevel::Info;
break;
case LogEntry::Level::Warn:
level = LogLevel::Warn;
break;
case LogEntry::Level::Error:
level = LogLevel::Error;
break;
default:
level = LogLevel::UNKNOWN;
break;
}
msg.set_level(level);

if (match(msg))
{
return msg;
}
}
}

bool LogIterator::match(const LogMessage & log_msg) const
{
// Check time range
if (log_msg.time() >= end_time || log_msg.time() < start_time)
return false;

// Check level
if (std::find(levels.begin(), levels.end(), log_msg.level()) == levels.end() && !levels.empty())
return false;

// Grep
auto & content = log_msg.message();
for (auto regex : patterns)
{
if (!std::regex_match(content, regex))
return false;
}

return true;
}

LogIterator::Result<LogIterator::LogEntry> LogIterator::readLog()
{
if (!*log_file)
{
if (log_file->eof())
return Error{Error::Type::EOI};
else
return Error{Error::Type::UNKNOWN};
}

std::string line;
std::getline(*log_file, line);
std::stringstream buff;

// TiFlash log format: YYYY.MM.DD hh:mm:ss.mmmmmm [ ThreadID ] <Level> channel: message
std::regex head_line_pattern("^\\d{4}\\.\\d{2}\\.\\d{2}\\s\\d{2}\\:\\d{2}\\:\\d{2}\\.\\d{6}\\s\\[\\s\\d+\\s\\]\\s\\<\\w+\\>\\s.*");
if (std::regex_match(line, head_line_pattern))
{
buff << line;
while (getline(*log_file, line))
{
if (!std::regex_match(line, head_line_pattern))
{
buff << "\n";
buff << line;
if (log_file->eof())
break;
}
else
{
size_t offset = line.size();
if (log_file->eof())
{
// We should call clear before seek if the fstream has reached end.
log_file->clear();
}
// Seek back to start of the line. Notice that getline will omit '\n', so we should handle it manually.
log_file->seekg(-(offset + 1), log_file->cur);
break;
}
}
}
else
{
return Error{Error::Type::UNEXPECTED_LOG_HEAD, line};
}

LogEntry entry;

std::string log_content = buff.str();

int year;
int month;
int day;
int hour;
int minute;
int second;
int milli_second;

int thread_id;
char level_buff[20];

std::sscanf(log_content.data(), "%d.%d.%d %d:%d:%d.%d [ %d ] %s ", &year, &month, &day, &hour, &minute, &second, &milli_second,
&thread_id, level_buff);

{
std::tm time;
time.tm_year = year - 1900;
time.tm_mon = month - 1;
time.tm_mday = day;
time.tm_hour = hour;
time.tm_min = minute;
time.tm_sec = second;

time_t ctime = mktime(&time) * 1000; // milliseconds
ctime += milli_second / 1000; // truncate microseconds

entry.time = ctime;
if (entry.time > end_time)
return Error{Error::Type::EOI};
}

{
std::string level_str(level_buff);
if (level_str == "<Trace>")
entry.level = LogEntry::Level::Trace;
else if (level_str == "<Debug>")
entry.level = LogEntry::Level::Debug;
else if (level_str == "<Information>")
entry.level = LogEntry::Level::Info;
else if (level_str == "<Warning>")
entry.level = LogEntry::Level::Warn;
else if (level_str == "<Error>")
entry.level = LogEntry::Level::Error;
else
return Error{Error::Type::INVALID_LOG_LEVEL, "level: " + level_str};
}

entry.thread_id = thread_id;

{
std::istringstream ss(log_content);
int offset = 33 + std::to_string(thread_id).size() + std::string(level_buff).size();
ss.seekg(offset, ss.beg);

std::string message(std::istreambuf_iterator<char>(ss), {});
entry.message = message;
}

return entry;
}

} // namespace DB
Loading

0 comments on commit 3684935

Please sign in to comment.