阿里云-云小站(无限量代金券发放中)
【腾讯云】云服务器、云数据库、COS、CDN、短信等热卖云产品特惠抢购

使用 Shell 轻松搞定 Linux 命令审计

32次阅读
没有评论

共计 5904 个字符,预计需要花费 15 分钟才能阅读完成。

导读 利用定制 Bash 源增加日志审计功能,并将用户操作发给 rsyslog 聚合,最后在 elasticsearch 做日志存储和查询。
使用 Shell 轻松搞定 Linux 命令审计

Pixelated word Linux made from cubes, mosaic pattern

首先,当谈到 Linux 的操作审计需求时,大多数我们希望的是还原线上服务器被人为(误)操作时执行的命令行,以及它关联的上下文。这个需求场景其实跟通用的业务日志采集一致,简单一点可以直接通过 history 将内容发给 syslog,复杂一点的采用 auditd 或 ebpf 在内核层面上捕获行为。

不过本文不打算对上述的方案做原理解释,仅仅站在一个运维小白的角度来完成日常 80%(80% 的数据来源?我也不知道,大概是二八原则)的操作审计。既然文章标题是用 Shell 来完成,由此可见今天的主题跟 Bash 脱不了关系了。

一句话概括今天的主题:利用定制 Bash 源增加日志审计功能,并将用户操作发给 rsyslog 聚合,最后在 elasticsearch 做日志存储和查询。

Linux 部分
  1. 准备一些必要的工具
  • rsyslog: 一个 Linux 上自带并兼容 syslog 语法的日志处理服务
  • jq: 一个在 shell 下处理 json 数据的小工具
  • logger: 一个可以往 syslog 输入日志的工具

这些小工具除 jq 外,大多操作系统发行版都自带,如果没有的话也可以直接用操作系统内置的包管理工具安装。

  • ash.audit.sh,并将其拷贝到 /etc/profile.d/ 目录下
  • if ["${SHELL##*/}" != "bash" ]; then
      return
    fi
    if ["${AUDIT_READY}" = "yes" ]; then
        return
    fi
    declare -rx HISTFILE="$HOME/.bash_history
    declare -rx HISTSIZE=500000
    declare -rx HISTFILESIZE=500000
    declare -rx HISTCONTROL=""declare -rx HISTIGNORE=""
    declare -rx HISTCMD
    declare -rx AUDIT_READY="yes"
    shopt -s histappend
    shopt -s cmdhist
    shopt -s histverify
    if shopt -q login_shell && [-t 0]; then
      stty -ixon
    fi
    if groups | grep -q root; then
      declare -x TMOUT=86400
      # chattr +a "$HISTFILE"
    fi
    declare -a LOGIN_INFO=($(who -mu | awk '{print $1,$2,$6}') )
    declare -rx AUDIT_LOGINUSER="${LOGIN_INFO[0]}"
    declare -rx AUDIT_LOGINPID="${LOGIN_INFO[2]}"
    declare -rx AUDIT_USER="$USER"
    declare -rx AUDIT_PID="$$"
    declare -rx AUDIT_TTY="${LOGIN_INFO[1]}"
    declare -rx AUDIT_SSH="$([-n"$SSH_CONNECTION"] && echo"$SSH_CONNECTION"| awk'{print $1":"$2"->"$3":"$4}')"
    declare -rx AUDIT_STR="$AUDIT_LOGINUSER  $AUDIT_LOGINPID  $AUDIT_TTY  $AUDIT_SSH"
    declare -rx AUDIT_TAG=$(echo -n $AUDIT_STR | sha1sum |cut -c1-12)
    declare -x AUDIT_LASTHISTLINE=""
    set +o functrace
    shopt -s extglob
    function AUDIT_DEBUG() {if [ -z "$AUDIT_LASTHISTLINE"]; then
        local AUDIT_CMD="$(fc -l -1 -1)"
        AUDIT_LASTHISTLINE="${AUDIT_CMD%%+([^ 0-9])*}"
      else
        AUDIT_LASTHISTLINE="$AUDIT_HISTLINE"
      fi
      local AUDIT_CMD="$(history 1)"
      AUDIT_HISTLINE="${AUDIT_CMD%%+([^ 0-9])*}"
      if ["${AUDIT_HISTLINE:-0}" -ne "${AUDIT_LASTHISTLINE:-0}" ] || ["${AUDIT_HISTLINE:-0}" -eq "1" ]; then
        MESSAGE=$(jq -c -n \
         --arg pwd "$PWD" \
         --arg cmd "${AUDIT_CMD##*()?(+([0-9])?(\*)+())}" \
         --arg user "$AUDIT_LOGINUSER" \
         --arg become "$AUDIT_USER" \
         --arg pid "$AUDIT_PID" \
         --arg info "${AUDIT_STR}" \
         '{cmd: $cmd, user: $user, become: $become, pid: $pid, pwd: $pwd, info: $info}')
        logger -p local6.info -t "$AUDIT_TAG" "@cee: $MESSAGE"
      fi
    }
    function AUDIT_EXIT() {
      local AUDIT_STATUS="$?"
      if [-n "$AUDIT_TTY"]; then
        MESSAGE_CLOSED=$(jq -c -n \
            --arg action "session closed" \
            --arg user "$AUDIT_LOGINUSER" \
            --arg become "$AUDIT_USER" \
            --arg pid "$AUDIT_PID" \
            --arg info "${AUDIT_STR}" \
            '{user: $user, become: $become, pid: $pid, action: $action, info: $info}')
        logger -p local6.info -t "$AUDIT_TAG" "@cee: $MESSAGE_CLOSED"
      fi
      exit "$AUDIT_STATUS"
    }
    declare -frx +t AUDIT_DEBUG
    declare -frx +t AUDIT_EXIT
    if [-n "$AUDIT_TTY"]; then
      MESSAGE_OPENED=$(jq -c -n \
          --arg action "session opened" \
          --arg user "$AUDIT_LOGINUSER" \
          --arg become "$AUDIT_USER" \
          --arg pid "$AUDIT_PID" \
          --arg info "${AUDIT_STR}" \
          '{user: $user, become: $become, pid: $pid, action: $action, info: $info}')
      logger -p local6.info -t "$AUDIT_TAG" "@cee: $MESSAGE_OPENED"
    fi
    declare -rx PROMPT_COMMAND="[-n"$AUDIT_DONE"] && echo''; AUDIT_DONE=; trap 'AUDIT_DEBUG && AUDIT_DONE=1; trap DEBUG' DEBUG
    declare -rx BASH_COMMAND
    declare -rx SHELLOPT
    trap AUDIT_EXIT EXIT

    简单说明下这个脚本,大致就是定义了 shell 的历史条目、登录超时时间、以及审计日志的格式和发送。

  • 配置 rsyslog 客户端,本地创建一个 /etc/rsyslog.d/40-audit.conf 文件,用于将本地 local6 级别的系统日志发送远端的 rsyslog 服务集中处理
  • $RepeatedMsgReduction off
    local6.info @:514
    & stop

    配置完成后,别忘了重启下 rsyslog 服务!

    数据部分

    数据部分顾名思义,用于接收并处理客户端发来的操作系统日志。这里我们用到了 rsyslog 和 elasticsearch 两个服务了。

    准备 rsyslog-elasticsearch

    要让 rsyslog 将日志发送给 elastichsearch,我们就必须安装它的 es 模块

    # Ubuntu  
    apt-get install -y rsyslog-elasticsearch rsyslog-mmjsonparse  
    #CentOS
    yum install rsyslog-elasticsearch rsyslog-mmjsonparse
    准备 ElasticSearch 服务

    为了简单部署,本文直接用 docker 快速拉起一个 ES 服务

    docker run -d --name elasticsearch  -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" elasticsearch:7.3.1
    配置 rsyslog 服务端,创建一个文件 /etc/rsyslog.d/40-audit-server.conf,用于定义日志的写入策略。
    $RepeatedMsgReduction off
    $ModLoad imudp
    $UDPServerRun 514
    module(load="mmjsonparse")          # for parsing CEE-enhanced syslog messages
    module(load="omelasticsearch")      # for outputting to Elasticsearch
    #try to parse a structured log
    # this is for index names to be like: rsyslog-YYYY.MM.DD
    template(name="rsyslog-index" type="string" string="bashaudit-%$YEAR%.%$MONTH%.%$DAY%")
    # this is for formatting our syslog in JSON with @timestamp
    template(name="json-syslog" type="list") {constant(value="{")
          constant(value="\"@timestamp\":\"")     property(name="timegenerated" dateFormat="rfc3339" date.inUTC="on")
          constant(value="\",\"host\":\"")        property(name="fromhost-ip")
          constant(value="\",\"severity\":\"")    property(name="syslogseverity-text")
          constant(value="\",\"facility\":\"")    property(name="syslogfacility-text")
          constant(value="\",\"program\":\"")     property(name="programname")
          constant(value="\",\"tag\":\"")         property(name="syslogtag"format="json")
          constant(value="\",")                   property(name="$!all-json"position.from="2")
        # closing brace is in all-json
    }
    if ($syslogfacility-text == 'local6' and $syslogseverity-text == 'info') then {action(type="mmjsonparse")
     action(type="omelasticsearch" template="json-syslog" searchIndex="rsyslog-index" dynSearchIndex="on" server="" serverport="")
            # action(type="omfile" file="/var/log/bashaudit.log")
            stop
    }

    这里采用了 rsyslog 的两个 module 来处理收集的日志

  • mmjsonparse 用于 json 格式化日志
  • omelasticsearch 用于配置 ElastichSearch
  • 配置完成重启 rsyslog 服务
  • 查询部分

    审计日志的查询我们可以使用 Kibana 或者自己根据 ElasticSearch API 进行二次开发。这里我们以 Kibana 举例。

    cat  ./kibana.yml
    server.port: 15601
    elasticsearch.hosts: ["http://:"]
    i18n.locale: "zh-CN"
    EOF
    docker run -d --ulimit nofile=1000000:1000000 --net host --name elasticsearch-audit -v ./kibana.yml:/usr/share/kibana/config/kibana.yml  --restart always docker.elastic.co/kibana/kibana-oss:7.3.1

    本地访问 http://localhost:15601 进入 kibana 配置创建一个名为 bashaudit 的索引模式

    使用 Shell 轻松搞定 Linux 命令审计

    之后,我们就能进入 Discover 中查询审计日志了,包含了基本 Shell 执行时间、来源用户、执行目录等数据。

    使用 Shell 轻松搞定 Linux 命令审计

    再进一步,我们也可以通过调用 API 的方式对审计日志做一些额外的二次开发,例如:

  • 对线上服务器热点用户统计
  • 对线上服务器做热点操作统计
  • 对线上危险 Shell 操作做告警
  • 使用 Shell 轻松搞定 Linux 命令审计

    总结

    本文讲述了采用定制 Bash 的方式,在用户登录初始化 Shell 的方式将其后续的命令行操作发送给 rsyslog 服务进行处理,并将格式化后的日志存储在 ElasticSearch 中方便辅助系统管理者在线上故障定位时使用,也可以依此对 Linux 命令行审计做可视化的二次开发。

    不过本文基于定制 Bash 的方式仍然具备很多局限性,例如:

  • 不能审计 ShellScript 内的执行逻辑;
  • 存在用其他 shell 绕过审计,如 zsh 等;
  • 可以看到要想审计到更详细的内容,光在 Bash(表面功夫)上实现并不能满足,读者可以尝试使用 snoopy 对 Shell 脚本内部做跟踪审计。

    阿里云 2 核 2G 服务器 3M 带宽 61 元 1 年,有高配

    腾讯云新客低至 82 元 / 年,老客户 99 元 / 年

    代金券:在阿里云专用满减优惠券

    正文完
    星哥说事-微信公众号
    post-qrcode
     0
    星锅
    版权声明:本站原创文章,由 星锅 于2024-07-24发表,共计5904字。
    转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
    【腾讯云】推广者专属福利,新客户无门槛领取总价值高达2860元代金券,每种代金券限量500张,先到先得。
    阿里云-最新活动爆款每日限量供应
    评论(没有评论)
    验证码
    【腾讯云】云服务器、云数据库、COS、CDN、短信等云产品特惠热卖中