* Kindling Probe 流程介绍
概述
Kndling probe将来自agent-libs的事件数据进行了进一步处理,将其封装成KindlingEvent,并通过CGO接口接收来自Collector端的订阅,并将被订阅的KindlingEvent发送到Collector端做进一步处理。
实现流程浅析
从执行入口main函数着眼来看,我们可以看到Kindling probe有2个比较重要的方法,分别是subscribe_server以及send_server。它们的作用如下:
- subEventForGo:接收来自collector端的事件订阅
- getKindlingEvent:向collector端发送处理好的KindlingEvent
有关probe对事件的处理流程概要如下图所示:
数据处理&发送逻辑
限于篇幅,这里我们重点看一下getKindlingEvent的实现。getKindlingEvent首先调用inspector->next(&ev)从agent-libs中获取了一个sinsp类型的事件指针,紧接着,过滤掉了自己这个进程本身以及一些不包含信息的io事件,随后,将agent-libs里的有效内容转移到kindling-event.
int getEvent(void **pp_kindling_event)
{
int32_t res;
sinsp_evt *ev;
res = inspector->next(&ev);
if(res == SCAP_TIMEOUT)
{
return -1;
}
else if(res != SCAP_SUCCESS)
{
return -1;
}
if(!inspector->is_debug_enabled() &&
ev->get_category() & EC_INTERNAL)
{
return -1;
}
auto threadInfo = ev->get_thread_info();
if(threadInfo == nullptr)
{
return -1;
}
auto category = ev->get_category();
if(category & EC_IO_BASE)
{
auto pres = ev->get_param_value_raw("res");
if(pres && *(int64_t *)pres->m_val <= 0)
{
return -1;
}
}
logCache->addLog(ev);
cpuConverter->Cache(ev);
uint16_t kindling_category = get_kindling_category(ev);
uint16_t ev_type = ev->get_type();
if(event_filters[ev_type][kindling_category] == 0)
{
return -1;
}
kindling_event_t_for_go *p_kindling_event;
if(nullptr == *pp_kindling_event)
{
// New a kindling_event_t_for_go
...
}
p_kindling_event = (kindling_event_t_for_go *)*pp_kindling_event;
if (ev_type == PPME_CPU_ANALYSIS_E) {
return cpuConverter->convert(p_kindling_event, ev);
}
sinsp_fdinfo_t *fdInfo = ev->get_fd_info();
p_kindling_event->timestamp = ev->get_ts();
p_kindling_event->category = kindling_category;
p_kindling_event->context.tinfo.pid = threadInfo->m_pid;
p_kindling_event->context.tinfo.tid = threadInfo->m_tid;
p_kindling_event->context.tinfo.uid = threadInfo->m_uid;
p_kindling_event->context.tinfo.gid = threadInfo->m_gid;
p_kindling_event->context.fdInfo.num = ev->get_fd_num();
if(nullptr != fdInfo)
{
p_kindling_event->context.fdInfo.fdType = fdInfo->m_type;
switch(fdInfo->m_type)
{
case SCAP_FD_FILE:
case SCAP_FD_FILE_V2:
{
...
break;
}
case SCAP_FD_IPV4_SOCK:
case SCAP_FD_IPV4_SERVSOCK:
...
break;
case SCAP_FD_UNIX_SOCK:
...
break;
default:
break;
}
}
uint16_t userAttNumber = 0;
switch(ev->get_type())
{
case PPME_TCP_RCV_ESTABLISHED_E:
case PPME_TCP_CLOSE_E:
{
...
break;
}
case PPME_TCP_CONNECT_X:
{
...
break;
}
case PPME_TCP_DROP_E:
case PPME_TCP_RETRANCESMIT_SKB_E:
case PPME_TCP_SET_STATE_E:
{
...
break;
}
case PPME_TCP_SEND_RESET_E:
case PPME_TCP_RECEIVE_RESET_E:
{
auto pTuple = ev->get_param_value_raw("tuple");
userAttNumber = setTuple(p_kindling_event, pTuple, userAttNumber);
break;
}
default:
{
uint16_t paramsNumber = ev->get_num_params();
if(paramsNumber > 8)
{
paramsNumber = 8;
}
for(auto i = 0; i < paramsNumber; i++)
{
strcpy(p_kindling_event->userAttributes[userAttNumber].key, (char *)ev->get_param_name(i));
memcpy(p_kindling_event->userAttributes[userAttNumber].value, ev->get_param(i)->m_val,
ev->get_param(i)->m_len);
p_kindling_event->userAttributes[userAttNumber].len = ev->get_param(i)->m_len;
p_kindling_event->userAttributes[userAttNumber].valueType = get_type(ev->get_param_info(i)->type);
userAttNumber++;
}
}
}
p_kindling_event->paramsNumber = userAttNumber;
strcpy(p_kindling_event->name, (char *)ev->get_name());
strcpy(p_kindling_event->context.tinfo.comm, (char *)threadInfo->m_comm.data());
strcpy(p_kindling_event->context.tinfo.containerId, (char *)threadInfo->m_container_id.data());
return 1;
}
接受订阅逻辑
probe启动时,会调用init_sub_label将所有订阅信息清空 probe端接收订阅之后的核心处理逻辑是sub_event这个函数,这个函数主要做了两个工作,一是将所有接收到的订阅事件存储到event_filters中,event_filters是一个以事件号和类型为index的二维数组。
void init_sub_label()
{
for(auto e : kindling_to_sysdig)
{
m_events[e.event_name] = e.event_type;
}
for(auto c : category_map)
{
m_categories[c.cateogry_name] = c.category_value;
}
for(int i = 0; i < 1024; i++)
{
for(int j = 0; j < 16; j++)
{
event_filters[i][j] = 0;
}
}
}
void sub_event(char *eventName, char *category)
{
cout << "sub event name:" << eventName << " && category:" << category << endl;
auto it_type = m_events.find(eventName);
if(it_type != m_events.end())
{
if(category == nullptr || category[0] == '\0')
{
for(int j = 0; j < 16; j++)
{
event_filters[it_type->second][j] = 1;
}
}
else
{
auto it_category = m_categories.find(category);
if(it_category != m_categories.end())
{
event_filters[it_type->second][it_category->second] = 1;
}
}
}
}