zabbix_aggentd 源码分析之基础篇

时间 : 18-12-20 栏目 : linux运维 作者 : 老薛 评论 : 0 点击 : 396 次


系统环境

系统版本:CentOS release 6.7 (Final)

内核版本:2.6.32-573.el6.x86_64

软件版本:zabbix-3.4.1


zabbix 源码组织结构

zabbix是采用Automake方式构建的开源项目,服务和工具是通过C语言实现,实现来跨平台的能力,目前zabbix_server还不支持Windows系统。

主要的目录是:

src为C代码的源码目录,includeC代码的头文件,frontends为前端php代码。

src目录libsmodules和各功能主程序的目录,zabbix_agent就是本文需要分析的主目录。

zabbix_agent main函数源码解析

/*主函数main()*/

main(int
argc, char **argv)

/*读取配置文件及接受命令行-------------开始*/

/*根据不同的平台(LinuxAIXSolaris等),设置argc,argv及环境变量*/

argv
= setproctitle_save_env(argc, argv);

/*检查args_parse参数*/

parse_commandline(argc,
argv, &t)

/*解析命令行,通过命令行来判断执行动作或是获取configfile*/

zbx_getopt_long(argc,
argv, shortopts, longopts, NULL)

/*初始化监控指标*/

init_metrics();

/*载入配置文件*/

zbx_load_config(ZBX_CFG_FILE_REQUIRED,
&t);

/*读取解析配置文件按关键字存放如定义的cfg结构体中,其中包括设置了很多全局字符指针的初始化*/

parse_cfg_file(CONFIG_FILE,
cfg, requirement, ZBX_CFG_STRICT);

 Set_defaults()     

/*根据CONFIG_HOSTNAME_ITEM指向的字符串 通过特殊字符解析为keyparameters,将AGENT_RESULT中的str指向CONFIG_HOSTNAME,在command中找到key值对于的节点设置该点的函数指针command->function(&request, result)*/

process(CONFIG_HOSTNAME_ITEM, PROCESS_LOCAL_COMMAND, &result) 

/*读取配置文件及接受命令行-------------结束*/

/*预定义宏函数*/

START_MAIN_ZABBIX_ENTRY(CONFIG_ALLOW_ROOT,
CONFIG_USER, t.flags);

START_MAIN_ZABBIX_ENTRY函数的宏定义在include/daemon.h文件中

#define
START_MAIN_ZABBIX_ENTRY(allow_root, user, flags)        daemon_start(allow_root, user, flags)

int     daemon_start(int allow_root, const char
*user, unsigned int flags)

{    

 //启动程序的开始入口

return
MAIN_ZABBIX_ENTRY(flags);

}

int     MAIN_ZABBIX_ENTRY(int flags)

{

   …

   //开启三个进程

   switch (thread_args->process_type)

   {

       case ZBX_PROCESS_TYPE_COLLECTOR:

           //用来收集本机的基本监控信息

threads[i] =
zbx_thread_start(collector_thread, thread_args);

           break;

       case ZBX_PROCESS_TYPE_LISTENER:

           thread_args->args = &listen_sock;

           //用来实现zabbix master连接过来的被动监控

           threads[i] = zbx_thread_start(listener_thread,
thread_args);

           break;

       case
ZBX_PROCESS_TYPE_ACTIVE_CHECKS:

           thread_args->args =
&CONFIG_ACTIVE_ARGS[j++];

           //主动的推送数据进程 active.c

           threads[i] =
zbx_thread_start(active_checks_thread, thread_args);

            break;

      }

    …

}

主动监控里面有个重要的逻辑,就是获取监控数据的refresh时间,应该是貌似是60s,但是如果你的配置有申明的话,那就按照config来说 。调用的是refresh_active_checks函数,序列化是用json处理的。

ZBX_THREAD_ENTRY(active_checks_thread,
args)

{

   …

   //zabbix调用的是send_buffer 函数方法

   if (now >= nextsend)

      {

         send_buffer(activechk_args.host,
activechk_args.port);

         nextsend = (int)time(NULL) + 1;

      }

  //获取监控数据的refresh时间

 if (now >= nextrefresh)

 {

     zbx_setproctitle("active checks #%d
[getting list of active checks]", process_num);

     if (FAIL ==
refresh_active_checks(activechk_args.host, activechk_args.port))

     {

         nextrefresh = (int)time(NULL) + 60;

     }

     else

     {

     nextrefresh = (int)time(NULL) +
CONFIG_REFRESH_ACTIVE_CHECKS;

     }

}

}

load_modules(CONFIG_LOAD_MODULE_PATH, CONFIG_LOAD_MODULE, CONFIG_TIMEOUT, 0))

metrics = func_list();   //metrics指向ZBX_MODULE_FUNC_ITEM_LIST(模块列表首地址)

/*初始化item指标*/

void    init_metrics()

{

add_metric(&parameters_agent[i], error, sizeof(error))

}

/*添加监控指标*/

int    
add_metric(ZBX_METRIC *metric, char *error, size_t max_error_len)

{

 //将动态链接库的函数加入到command结构中的function

commands[i].function = metric->function

}

zbx_tcp_init(&s_in, (ZBX_SOCKET)fileno(stdin));

zbx_tcp_init(&s_out, (ZBX_SOCKET)fileno(stdout)); //将标准输入输出通过管道链接到socket的接受和发送

zbx_tcp_recv(&s_in, &command)) //获取socket中数据放入command中

 

process(command, 0, &result);   //第二次调用该函数,第一次调用是解析配置文件,第二次调用是解析来自socket数据。遍历commands,通过解析的key值找到commands中对应的函数:

for(command = commands; NULL != command->key; command++ )

if (0 == strcmp(command->key, key)) break;

command->function(&request, result) 该函数为:EXECUTE_USER_PARAMETER(AGENT_REQUEST *request, AGENT_RESULT *result)

/* EXECUTE_USER_PARAMETER 函数*/

int     EXECUTE_USER_PARAMETER(AGENT_REQUEST
*request, AGENT_RESULT *result)

{

        char   
*command;

        if (1 != request->nparam)

        {

                SET_MSG_RESULT(result,
zbx_strdup(NULL, "Too many parameters."));

                return SYSINFO_RET_FAIL;

        }

        command = get_rparam(request, 0);

        return EXECUTE_STR(command, result);

}

/* EXECUTE_STR函数*/

int     EXECUTE_STR(const char *command,
AGENT_RESULT *result)

{

        const char      *__function_name =
"EXECUTE_STR";

        int             ret = SYSINFO_RET_FAIL;

        char            *cmd_result = NULL,
error[MAX_STRING_LEN];

        init_result(result);

        if (SUCCEED != zbx_execute(command,
&cmd_result, error, sizeof(error), CONFIG_TIMEOUT))

        {

                SET_MSG_RESULT(result,
zbx_strdup(NULL, error));

                goto out;

        }

        zbx_rtrim(cmd_result, ZBX_WHITESPACE);

        zabbix_log(LOG_LEVEL_DEBUG, "%s()
command:'%s' len:" ZBX_FS_SIZE_T " cmd_result:'%.20s'",

                        __function_name,
command, (zbx_fs_size_t)strlen(cmd_result), cmd_result);

        SET_TEXT_RESULT(result,
zbx_strdup(NULL, cmd_result));

        ret = SYSINFO_RET_OK;

out:

        zbx_free(cmd_result);

        return ret;

}

/* zbx_execute 函数*/

zbx_execute(command, &cmd_result, error, sizeof(error), CONFIG_TIMEOUT) 

zbx_execute该函数中使用了windows和linux不同的机制执行command分别用宏区分开,返回值为cmd_result。

zbx_rtrim(cmd_result, ZBX_WHITESPACE);

SET_TEXT_RESULT(result, zbx_strdup(NULL, cmd_result))

利用cmd_result来填充AGENT_RESULT 。

 

value = GET_TEXT_RESULT(&result);

value = GET_MSG_RESULT(&result);

zbx_tcp_send(&s_out, *value);

发送数据

int    
zbx_execute(const char *command, char **output, char *error, size_t
max_error_len, int timeout)

{

zbx_alarm_on(timeout);

        if (-1 != (fd = zbx_popen(&pid,
command)))

        {

                int     rc, status;

                char    tmp_buf[PIPE_BUFFER_SIZE];

                while (0 < (rc = read(fd,
tmp_buf, sizeof(tmp_buf) - 1)) && MAX_EXECUTE_OUTPUT_LEN > offset +
rc)

                {

                        tmp_buf[rc] = '\0';

                       
zbx_strcpy_alloc(&buffer, &buf_size, &offset, tmp_buf);

                }

                close(fd);

                if (-1 == rc || -1 ==
zbx_waitpid(pid, &status))

                {

                        if (EINTR == errno)

                                ret =
TIMEOUT_ERROR;

                        else

                               
zbx_snprintf(error, max_error_len, "zbx_waitpid() failed: %s",
zbx_strerror(errno));

                        /* kill the whole
process group, pid must be the leader */

                        if (-1 == kill(-pid,
SIGTERM))

                               
zabbix_log(LOG_LEVEL_ERR, "failed to kill [%s]: %s", command,
zbx_strerror(errno));

                        zbx_waitpid(pid, NULL);

                }

                else if (MAX_EXECUTE_OUTPUT_LEN
<= offset + rc)

                {

                       
zabbix_log(LOG_LEVEL_ERR, "command output exceeded limit of %d
KB",

                                       
MAX_EXECUTE_OUTPUT_LEN / ZBX_KIBIBYTE);

}

                else if (0 == WIFEXITED(status)
|| 0 != WEXITSTATUS(status))

                {

                        if ('\0' == *buffer)

                        {

                                if
(WIFEXITED(status))

                                {

                                       
zbx_snprintf(error, max_error_len, "Process exited with code:
%d.",

                                                       
WEXITSTATUS(status));

                                }

                                else if
(WIFSIGNALED(status))

                                {

                                       
zbx_snprintf(error, max_error_len, "Process killed by signal:
%d.",

                                                       
WTERMSIG(status));

                                }

                                else

                                       
zbx_strlcpy(error, "Process terminated unexpectedly.",
max_error_len);

                        }

                        else

                                zbx_strlcpy(error,
buffer, max_error_len);

                }

                else

                        ret = SUCCEED;

        }

        else

                zbx_strlcpy(error, zbx_strerror(errno),
max_error_len);

       
zbx_alarm_off();

}

被动监控

/* zbx_thread_start 函数路径src/zabbix_agent/zabbix_agentd.c*/

zbx_thread_start(listener_thread, thread_args);

/* listener_thread 函数在src/zabbix_agent/listener.c

 listener_thread进程是负责被动方式,它负责监听10050端口,然后等待server端的请求并进行响应。*/

ZBX_THREAD_ENTRY(listener_thread, args)

{

/*
src/zabbix_agent/listener.c*/

process_listener(&s);

}

/*函数process_listener */

static void     process_listener(zbx_socket_t *s)

{

   AGENT_RESULT    result;

   char            **value = NULL;

   int             ret;

   if (SUCCEED == (ret = zbx_tcp_recv_to(s,
CONFIG_TIMEOUT)))

   {

       zbx_rtrim(s->buffer,
"\r\n");

       zabbix_log(LOG_LEVEL_DEBUG,
"Requested [%s]", s->buffer);

       init_result(&result);

       if (SUCCEED == process(s->buffer,
PROCESS_WITH_ALIAS, &result))

          {      

            if (NULL != (value =
GET_TEXT_RESULT(&result)))

               {      

                  zabbix_log(LOG_LEVEL_DEBUG,
"Sending back [%s]", *value);

                  ret = zbx_tcp_send_to(s,
*value, CONFIG_TIMEOUT);

               }

           }else{      

                 value =
GET_MSG_RESULT(&result);

                 if (NULL != value)

                   {      

                      static char     *buffer = NULL;

                      static size_t   buffer_alloc = 256;

                      size_t          buffer_offset = 0;

zabbix_log(LOG_LEVEL_DEBUG,
"Sending back [" ZBX_NOTSUPPORTED ": %s]", *value);

if (NULL == buffer)

                     buffer = zbx_malloc(buffer,
buffer_alloc);

                     zbx_strncpy_alloc(&buffer,
&buffer_alloc, &buffer_offset,

                                               
ZBX_NOTSUPPORTED, ZBX_CONST_STRLEN(ZBX_NOTSUPPORTED));

                     buffer_offset++;

                     zbx_strcpy_alloc(&buffer,
&buffer_alloc, &buffer_offset, *value);

                     ret =
zbx_tcp_send_bytes_to(s, buffer, buffer_offset, CONFIG_TIMEOUT);

                        }else{

                 zabbix_log(LOG_LEVEL_DEBUG,
"Sending back [" ZBX_NOTSUPPORTED "]");

                 ret = zbx_tcp_send_to(s,
ZBX_NOTSUPPORTED, CONFIG_TIMEOUT);

                        }

                }

                free_result(&result);

        }

        if (FAIL == ret)

          zabbix_log(LOG_LEVEL_DEBUG,
"Process listener error: %s", zbx_socket_strerror());

}

/*src/libs/zbxsysinfo/sysinfo.c */

int  process(const
char *in_command, unsigned flags, AGENT_RESULT *result)

{

   for (command = commands; NULL !=
command->key; command++)

     {      

       zabbix_log(LOG_LEVEL_INFORMATION,
"XUEKUN-----DEBUG---------->%s----->%s", command->key,
request.key);

        if (0 == strcmp(command->key,
request.key))

             break;

     }

     /*执行获取item监控信息指令*/

command->function(&request, result)

}

/* item test parameters; user parameter items
keep command here */

/*include/module.h */

typedef
struct

{

        char            *key;

        unsigned        flags;

        int             (*function)();

        char            *test_param;    /* item test parameters; user parameter
items keep command here */

}ZBX_METRIC;

/*src/libs/zbxsysinfo/linux/linux.c*/

ZBX_METRIC      parameters_specific[] =

/*      KEY                     FLAG            FUNCTION                TEST PARAMETERS */

{

    {"kernel.maxfiles",     0,     
KERNEL_MAXFILES,        NULL},

    {"kernel.maxproc",      0,   
KERNEL_MAXPROC,         NULL},

       

    {"vfs.fs.size", CF_HAVEPARAMS,  VFS_FS_SIZE,    "/,free"},

    {"vfs.fs.inode", CF_HAVEPARAMS,
VFS_FS_INODE,   "/,free"},

    {"vfs.fs.discovery",    0,  
VFS_FS_DISCOVERY,     NULL},

    {"vfs.dev.read", CF_HAVEPARAMS, VFS_DEV_READ,"sda,operations"},

  {"vfs.dev.write", CF_HAVEPARAMS, VFS_DEV_WRITE,
"sda,operations"},

{"net.tcp.listen", CF_HAVEPARAMS,
NET_TCP_LISTEN, "80"},

  {"net.udp.listen", CF_HAVEPARAMS, NET_UDP_LISTEN,  "68"},

  {"net.if.in", CF_HAVEPARAMS,  NET_IF_IN, 
"lo,bytes"},

 {"net.if.out", CF_HAVEPARAMS,  NET_IF_OUT, "lo,bytes"},

 {"net.if.total", CF_HAVEPARAMS, NET_IF_TOTAL,
"lo,bytes"},

 {"net.if.collisions",CF_HAVEPARAMS,
NET_IF_COLLISIONS, "lo"},

 {"net.if.discovery",    0, 
NET_IF_DISCOVERY,       NULL},

{"vm.memory.size",
CF_HAVEPARAMS, VM_MEMORY_SIZE,        
"total"},

{"proc.cpu.util",       CF_HAVEPARAMS, 
PROC_CPU_UTIL,         
"inetd"},

  {"proc.num", CF_HAVEPARAMS,  PROC_NUM,  
"inetd"},

  {"proc.mem", CF_HAVEPARAMS, PROC_MEM,    "inetd"},

  {"system.cpu.switches", 0,
SYSTEM_CPU_SWITCHES, NULL},

{"system.cpu.intr", 0,
SYSTEM_CPU_INTR,  NULL},

 {"system.cpu.util", CF_HAVEPARAMS, SYSTEM_CPU_UTIL,        "all,user,avg1"},

{"system.cpu.load",
CF_HAVEPARAMS,  SYSTEM_CPU_LOAD,        "all,avg1"},

{"system.cpu.num",
CF_HAVEPARAMS,  SYSTEM_CPU_NUM,         "online"},

{"system.cpu.discovery",0,
SYSTEM_CPU_DISCOVERY,   NULL},

{"system.uname",  0, SYSTEM_UNAME,  NULL},

{"system.hw.chassis",
CF_HAVEPARAMS,  SYSTEM_HW_CHASSIS,      NULL},

{"system.hw.cpu",       CF_HAVEPARAMS,  SYSTEM_HW_CPU,          NULL},

        {"system.hw.devices",   CF_HAVEPARAMS,  SYSTEM_HW_DEVICES,      NULL},

        {"system.hw.macaddr",   CF_HAVEPARAMS,  SYSTEM_HW_MACADDR,      NULL},

{"system.sw.arch",
0, SYSTEM_SW_ARCH,  NULL},

{"system.sw.os",
CF_HAVEPARAMS,  SYSTEM_SW_OS,  NULL},

{"system.sw.packages",  CF_HAVEPARAMS,  SYSTEM_SW_PACKAGES,     NULL},

{"system.swap.size",
CF_HAVEPARAMS,  SYSTEM_SWAP_SIZE,       "all,free"},

{"system.swap.in",
CF_HAVEPARAMS,  SYSTEM_SWAP_IN,         "all"},

{"system.swap.out",
CF_HAVEPARAMS,  SYSTEM_SWAP_OUT,        "all"},

{"system.uptime",
0, SYSTEM_UPTIME,  NULL},

{"system.boottime",
0, SYSTEM_BOOTTIME, NULL},

{"sensor",
CF_HAVEPARAMS, GET_SENSOR, "w83781d-i2c-0-2d,temp1"},

{NULL}

};

/*初始化监控指标*/

void    init_metrics()

{

       int    
i;

        char   
error[MAX_STRING_LEN];

        commands = zbx_malloc(commands,
sizeof(ZBX_METRIC));

        commands[0].key = NULL;

#ifdef
WITH_AGENT_METRICS

        for (i = 0; NULL !=
parameters_agent[i].key; i++)

        {      

                if (SUCCEED != add_metric(&parameters_agent[i],
error, sizeof(error)))

                {      

                       
zabbix_log(LOG_LEVEL_CRIT, "cannot add item key: %s", error);

                        exit(EXIT_FAILURE);

                }

        }

#endif

#ifdef
WITH_COMMON_METRICS

        for (i = 0; NULL !=
parameters_common[i].key; i++)

        {

                if (SUCCEED !=
add_metric(&parameters_common[i], error, sizeof(error)))

                {

                       
zabbix_log(LOG_LEVEL_CRIT, "cannot add item key: %s", error);

                        exit(EXIT_FAILURE);

                }

        }

#endif

#ifdef
WITH_SPECIFIC_METRICS

        for (i = 0; NULL !=
parameters_specific[i].key; i++)

        {

                if (SUCCEED !=
add_metric(&parameters_specific[i], error, sizeof(error)))

                {

zabbix_log(LOG_LEVEL_CRIT,
"cannot add item key: %s", error);

                        exit(EXIT_FAILURE);

                }

        }

#endif

#ifdef
WITH_SIMPLE_METRICS

        for (i = 0; NULL !=
parameters_simple[i].key; i++)

        {

                if (SUCCEED !=
add_metric(&parameters_simple[i], error, sizeof(error)))

                {

                       
zabbix_log(LOG_LEVEL_CRIT, "cannot add item key: %s", error);

                        exit(EXIT_FAILURE);

                }

        }

#endif

#ifdef
WITH_HOSTNAME_METRIC

        if (SUCCEED !=
add_metric(&parameter_hostname, error, sizeof(error)))

        {

                zabbix_log(LOG_LEVEL_CRIT,
"cannot add item key: %s", error);

                exit(EXIT_FAILURE);

        }

#endif

                                                                                                
}

/*src/libs/zbxsysinfo/linux/cpu.c
各个监控项函数*/

int     SYSTEM_CPU_LOAD(AGENT_REQUEST *request,
AGENT_RESULT *result)

{

        char   
*tmp;

        int    
mode, per_cpu = 1, cpu_num;

        double 
load[ZBX_AVG_COUNT], value;

        if (2 < request->nparam)

        {      

                SET_MSG_RESULT(result,
zbx_strdup(NULL, "Too many parameters."));

                return SYSINFO_RET_FAIL;

        }

        tmp = get_rparam(request, 0);

        if (NULL == tmp || '\0' == *tmp || 0 ==
strcmp(tmp, "all"))

        {      

                per_cpu = 0;

        }

        else if (0 != strcmp(tmp,
"percpu"))

        {      

                SET_MSG_RESULT(result,
zbx_strdup(NULL, "Invalid first parameter."));

                return SYSINFO_RET_FAIL;

        }

        tmp = get_rparam(request, 1);

        if (NULL == tmp || '\0' == *tmp || 0 ==
strcmp(tmp, "avg1"))

                mode = ZBX_AVG1;

        else if (0 == strcmp(tmp,
"avg5"))

                mode = ZBX_AVG5;

        else if (0 == strcmp(tmp,
"avg15"))

                mode = ZBX_AVG15;

        else

{

                SET_MSG_RESULT(result,
zbx_strdup(NULL, "Invalid second parameter."));

                return SYSINFO_RET_FAIL;

        }

        if (mode >= getloadavg(load, 3))

        {

                SET_MSG_RESULT(result,
zbx_strdup(NULL, "Cannot obtain load average."));

                return SYSINFO_RET_FAIL;

        }

        value = load[mode];

        zabbix_log(LOG_LEVEL_TRACE,
"XUEKUN-----DEBUG----------getloadavg----->%ld", value);

       
printf("XUEKUN-----DEBUG----------getloadavg----->%ld",
value);

        if (1 == per_cpu)

        {

                if (0 >= (cpu_num =
sysconf(_SC_NPROCESSORS_ONLN)))

                {

                        SET_MSG_RESULT(result,
zbx_dsprintf(NULL, "Cannot obtain number of CPUs: %s",

                               
zbx_strerror(errno)));

                        return
SYSINFO_RET_FAIL;

                }

                value /= cpu_num;

        }

        SET_DBL_RESULT(result, value);

        return SYSINFO_RET_OK;

}

本文标签

除非注明,文章均为( 老薛 )原创,转载请保留链接: http://www.bdkyr.com/xtyw002/2728.html

zabbix_aggentd 源码分析之基础篇:等您坐沙发呢!

发表评论

6 + 0 = ?


博主微信号,很高兴为您提供帮助

随便看看

为您推荐

0