集群环境下如何通过单个节点获取其他节点的日志
最近在负责重构配置中心服务的工作,要在原来的功能基础上添加几个新需求,其中一个就是通过一个节点上的配置中心,下载到整个集群中其他节点的日志。目前所有节点的日志都存放在本地磁盘,如果要实现这个功能,根据网上了解到的,大概可以有以下三种做法:
1.添加一个统一的文件系统或文档数据库,所有的日志都往这个文件系统中写。这种方式需要对所有服务进行代码修改,所以明显不可行;
2.每个节点上开启FTP服务器,各个节点的日志路径链接到FTP服务器下,从而实现各个节点日志写到各自的FTP服务器中的效果,然后通过一个节点上的配置中心去各个节点的FTP服务器中下载日志到本地,再统一打包发给前端;
3.代码中通过SSH客户端API远程登陆到其他节点,然后通过scp将需要的日志传输到本地,再统一打包发给前端。就这个需求而言,2和3差不多,可能2更简单点,但由于还一个远程抓包的需求必须使用SSH,为了统一当前采用的是第3种方式。
SSH远程登陆的方式首先需要一个SSH客户端,网上一搜直接拿过来修改一下就可以使用:
[code]/* * 连接远程linux服务器并执行相关的shell命令 */ public class SSHUtil { private static final Log log = LogFactory.getLog(SSHUtil.class); private static String DEFAULTCHARTSET = "UTF-8"; private static ThreadLocal<Connection> local = new ThreadLocal<>(); /* * 登陆远程主机 */ public static Boolean login(String ip, String username, String password) { boolean flag = false; try { Connection conn = new Connection(ip); conn.connect(); flag = conn.authenticateWithPassword(username, password); if (flag) { log.info("认证成功!"); local.set(conn); } else { log.info("认证失败!"); conn.close(); } } catch (IOException e) { log.error(e.getMessage(), e); } return flag; } /* * 远程执行shell脚本或者命令 */ public static String execute(String cmd) { String result = ""; try { Session session = local.get().openSession(); session.execCommand(cmd); result = processStdout(session.getStdout(), DEFAULTCHARTSET); if (StringUtils.isBlank(result)) { // 如果为得到标准输出为空,说明脚本执行出错了 result = processStdout(session.getStderr(), DEFAULTCHARTSET); } session.close(); } catch (IOException e) { log.error(e.getMessage(), e); } return result; } /* * 关闭连接 */ public static void close() { local.get().close(); } /* * 解析脚本执行的返回结果 */ public static String processStdout(InputStream in, String charset) { InputStream stdout = new StreamGobbler(in); StringBuilder builder = new StringBuilder(); BufferedReader br = null; try { br = new BufferedReader(new InputStreamReader(stdout, charset)); String line = null; while ((line = br.readLine()) != null) { builder.append(line + "\n"); } } catch (Exception e) { log.error(e.getMessage(), e); } finally { try { br.close(); } catch (IOException e) { log.error(e.getMessage(), e); } } return builder.toString(); } /* * 通过用户名和密码关联linux服务器 */ public static boolean connectLinux(String ip, String username, String password, String commandStr) { String returnStr = ""; boolean result = true; try { if (login(ip, username, password)) { returnStr = execute(commandStr); log.info(returnStr); } } catch (Exception e) { log.error(e.getMessage(), e); } if (StringUtils.isBlank(returnStr)) { result = false; } return result; } /* * 从其他服务器获取文件到本服务器指定目录 */ public static void scpGet(String ip, String username, String password, String remoteFile, String localDir) throws IOException { log.info("scpGet==ip:" + ip + " username:" + username + " remoteFile:" + remoteFile + " localFile:" + localDir); if (login(ip, username, password)) { SCPClient client = new SCPClient(local.get()); client.get(remoteFile, localDir); local.get().close(); } } /* * 将文件复制到其他计算机中 */ public static void scpPut(String ip, String username, String password, String localFile, String remoteDir) throws IOException { log.info("scpPut==ip:" + ip + " username:" + username + " localFile:" + localFile + " remoteDir:" + remoteDir); if (login(ip, username, password)) { SCPClient client = new SCPClient(local.get()); client.put(localFile, remoteDir); local.get().close(); } } }
有了SSH客户端后,下面的代码就非常简单了,先贴出来:
[code] @Override public String logDown(String[] servers, String startTime, String endTime) { try { long start = strToLong(startTime); long end = strToLong(endTime); List<TabNodes> list = tabNodesRepository.findInType(); CountDownLatch latch = new CountDownLatch(list.size()); for (TabNodes node : list) { log.info("打包获取:" + node.getIpaddress() + "上的相关日志"); executor.submit(new RemoteLog(servers, start, end, node.getIpaddress(), latch)); } latch.await(); log.info("合并所有节点机的相关日志"); generateAllLogTar(); } catch (Exception e) { log.error(e.getMessage(), e); return e.getMessage(); } return ResultText.SUCCESS; } private void generateAllLogTar() throws FileNotFoundException { List<String> toTar = new ArrayList<>(); File logDir = new File(LOGHOME); String[] serverLogs = logDir.list(); for (String str : serverLogs) { if (str.indexOf(TARPROFIX) > 0) toTar.add(LOGHOME + "/" + str); } tarUtils.execute(toTar, ALLLOGTAR); } class RemoteLog implements Runnable { String[] servers; long start; long end; String remoteIp; CountDownLatch latch; public RemoteLog(String[] servers, long start, long end, String remoteIp, CountDownLatch latch) { this.servers = servers; this.start = start; this.end = end; this.remoteIp = remoteIp; this.latch = latch; } @Override public void run() { String logToTarDir = LOGHOME + "/" + remoteIp + LOGDOWN; String logTarToMerge = logToTarDir + TARGZ; try { SSHUtil.login(remoteIp, username, password); SSHUtil.execute("mkdir " + logToTarDir); String result = SSHUtil.execute("ls -l " + LOGHOME + "/*.log* --time-style=long-iso"); String[] lines = result.split("\n"); for (String line : lines) { String str = line.substring(line.lastIndexOf("/") + 1, line.lastIndexOf(".log")); long time = strToLong(line.split("\\s+")[5]); for (String server : servers) { if (str.equals(server) || str.equals(server + ".event") || str.equals(server + ".eventdata") && time >= start) { String logToTar = line.substring(line.indexOf("/")); SSHUtil.execute("cp " + logToTar + " " + logToTarDir); } } } log.info("打包压缩"+remoteIp+"上的相关日志"); SSHUtil.execute("tar -zcvf " + logTarToMerge + " " + logToTarDir); if (!remoteIp.equals(LOCALADDR)) log.info("将"+remoteIp+"上的日志压缩文件移动到"+LOCALADDR); SSHUtil.execute("scp " + logTarToMerge + " " + username + "@" + LOCALADDR + ":" + LOGHOME); } catch (Exception e) { log.error(e.getMessage(),e); } finally { if (!remoteIp.equals(LOCALADDR)) { log.info("移除节点:"+remoteIp+"上的打包文件"); SSHUtil.execute("rm " + logTarToMerge); } log.info("移除节点:"+remoteIp+"上的日志暂存目录"); SSHUtil.execute("rm -rf " + logToTarDir); latch.countDown(); } } }
主要的业务逻辑如下:
1.先找出集群中所有节点列表,执行forEach操作;
2.对于每一个节点进行日志遍历,找到需要的日志,打包压缩成tar.gz文件,如果不是本机,则通过scp传输到本地,然后删除原压缩文件;
3.本地对各个节点传输过来的压缩文件进行再打包压缩,然后传给前端下载。
逻辑比较简单,但在实现过程中,碰到几个问题还有很有价值分享下的:
1.在节点很多的情况下,如果保证传输效率?如果一个一个节点去顺序操作,那么在节点较多的情况下势必很慢,而且如果中间一个节点上出现异常就会导致整个操作的中断,解决方案就是使用ExecutorService线程池,对每一个节点都execute一个任务去独立执行;
2.如果采用多线程异步的方式,那么本地线程何时去打包各个节点传输过来的日志压缩文件?由于是异步的,所以本地线程并不知道所有节点的日志什么时候传输完,如果为所有节点execute完任务后就去执行本地打包操作,那么肯定会漏掉部分甚至全部日志,解决方案是使用CountDownLatch进行同步操作,首先创建一个锁数量为节点数量的latch,并传递给节点任务,每一个节点任务执行完后,调用latch.countDown()释放一个锁,为了保证锁一定能被释放,要将这一方法放在finally代码块中。本地线程为所有节点execute完任务后,调用latch.await()进行等待,当所有节点任务线程执行完后,本地线程再执行下面的逻辑;
3.先看一下原先的SSH客户端代码:
[code]public class SSHUtil { private static final Log log = LogFactory.getLog(SSHUtil.class); private static String DEFAULTCHARTSET = "UTF-8"; private static Connection conn; public static Boolean login(String ip, String username, String password) { boolean flag = false; try { conn = new Connection(ip); conn.connect(); flag = conn.authenticateWithPassword(username, password); if (flag) { log.info("认证成功!"); } else { log.info("认证失败!"); conn.close(); } } catch (IOException e) { log.error(e.getMessage(), e); } return flag; } public static String execute(String cmd) { String result = ""; try { Session session = conn.openSession(); session.execCommand(cmd); result = processStdout(session.getStdout(), DEFAULTCHARTSET); if (StringUtils.isBlank(result)) { result = processStdout(session.getStderr(), DEFAULTCHARTSET); } session.close(); } catch (IOException e) { log.error(e.getMessage(), e); } return result; } public static void close() { conn.close(); } ...... }
原版的SSH客户端的连接Connection是一个静态变量,调用login方法时进行初始化,之后每一个execute方法使用的都是同一个Connection,在多线程情况下势必会出现问题:每一个节点任务使用的都是同一个连接,而每一个节点都会调用login方法重新登陆,从而出现了竞态条件问题,导致所有节点任务中都无法登陆远程节点。解决方案是使用ThreadLocal替换静态变量的方式,每一个节点任务中的连接都是新建的,同时存放在ThreadLocal中,使用时就去ThreadLocal中获取属于特定线程的连接,这样就避免了线程安全问题。
这样,整个功能才算相对完整地实现了,可见需求再小,需要注意的技术点却一点都不少。
- Apache Zookeeper单个节点测试环境与集群环境设置
- 通过perl统计日志中请求/响应,获取单个用户成功/失败的操作次数。
- 如何设计点击点击一个div,其他div做出对应反应,以及获取一个节点下的子节点
- 解析如何通过PHP函数获取当前运行的环境 来进行判断执行逻辑(小技巧)
- 黄聪:二、如何通过URL获取其他网页源代码内容(火狐插件扩展开发教程)
- QuickFlow-如何通过QFD and ExecuteCode获取其他列表数据
- 如何一步一步删除(Linux & UNIX)环境下 Oracle 11g 集群节点
- 通过阅读源码分析elasticsearch中分片如何分配到集群中节点
- 节点加入k8s集群如何获取token等参数值
- vue---vue中如何操作dom获取节点?.vue组件中通过mounted获取节点操作
- 如何在android环境下进行ttf解析,通过获取文件头信息得到字体名称!
- 通过系统广播,获得apk的packageName以后,如何仅通过packageName获取其他信息
- 如何一步一步删除(linux & UNIX)环境下 oracle 11g 集群节点
- 解析如何通过PHP函数获取当前运行的环境 来进行判断执行逻辑(小技巧)
- 如何通过cmdline获取panel型号的dtsi文件节点(qcom,lcd,id)
- win7 IE11下,无法通过Windows更新为其他微软产品获取更新
- [每天解决一问题系列 - 0012] 如何通过程序获取IIS站点信息
- ElasticSearch集群未连接 无法发现节点(windows环境)以及windows环境下设置服务 不能自动启动的问题
- openlaszlo中如何通过目录动态获取数据
- hbase各节点日志通过syslog转发配置