1.封装后台线程BaseDaemonThread
public abstract class BaseDaemonThread extends Thread {
protected BaseDaemonThread(Runnable runnable) {
super(runnable);
this.setDaemon(true);
}
protected BaseDaemonThread(String threadName) {
super();
this.setName(threadName);
this.setDaemon(true);
}
}
2.基于BaseDaemonThread设计基础BaseHeartBeatTask设计
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.model;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public abstract class BaseHeartBeatTask<T> extends BaseDaemonThread {
private final String threadName;
private final long heartBeatInterval;
protected boolean runningFlag;
public BaseHeartBeatTask(String threadName, long heartBeatInterval) {
super(threadName);
this.threadName = threadName;
this.heartBeatInterval = heartBeatInterval;
this.runningFlag = true;
}
@Override
public synchronized void start() {
log.info("Starting {}...", threadName);
super.start();
log.info("Started {}, heartBeatInterval: {}...", threadName, heartBeatInterval);
}
@Override
public void run() {
while (runningFlag) {
try {
if (!ServerLifeCycleManager.isRunning()) {
log.info("The current server status is {}, will not write heartBeatInfo into registry",
ServerLifeCycleManager.getServerStatus());
continue;
}
T heartBeat = getHeartBeat();
writeHeartBeat(heartBeat);
} catch (Exception ex) {
log.error("{} task execute failed", threadName, ex);
} finally {
try {
Thread.sleep(heartBeatInterval);
} catch (InterruptedException e) {
handleInterruptException(e);
}
}
}
}
public void shutdown() {
runningFlag = false;
log.warn("{} finished...", threadName);
}
private void handleInterruptException(InterruptedException ex) {
log.warn("{} has been interrupted", threadName, ex);
Thread.currentThread().interrupt();
}
public abstract T getHeartBeat();
public abstract void writeHeartBeat(T heartBeat);
}
3.告警心跳Task的设计AlertHeartbeatTask
继承BaseHeartBeatTask,并用了父类BaseHeartBeatTask的方法
package org.apache.dolphinscheduler.alert.registry;
import org.apache.dolphinscheduler.alert.config.AlertConfig;
import org.apache.dolphinscheduler.common.model.AlertServerHeartBeat;
import org.apache.dolphinscheduler.common.model.BaseHeartBeatTask;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.registry.api.RegistryClient;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class AlertHeartbeatTask extends BaseHeartBeatTask<AlertServerHeartBeat> {
private final AlertConfig alertConfig;
private final Integer processId;
private final RegistryClient registryClient;
private final String heartBeatPath;
private final long startupTime;
public AlertHeartbeatTask(AlertConfig alertConfig,
RegistryClient registryClient) {
super("AlertHeartbeatTask", alertConfig.getHeartbeatInterval().toMillis());
this.startupTime = System.currentTimeMillis();
this.alertConfig = alertConfig;
this.registryClient = registryClient;
this.heartBeatPath =
RegistryNodeType.ALERT_SERVER.getRegistryPath() + "/" + alertConfig.getAlertServerAddress();
this.processId = OSUtils.getProcessID();
}
@Override
public AlertServerHeartBeat getHeartBeat() {
return AlertServerHeartBeat.builder()
.processId(processId)
.startupTime(startupTime)
.reportTime(System.currentTimeMillis())
.cpuUsage(OSUtils.cpuUsagePercentage())
.memoryUsage(OSUtils.memoryUsagePercentage())
.availablePhysicalMemorySize(OSUtils.availablePhysicalMemorySize())
.alertServerAddress(alertConfig.getAlertServerAddress())
.build();
}
@Override
public void writeHeartBeat(AlertServerHeartBeat heartBeat) {
String heartBeatJson = JSONUtils.toJsonString(heartBeat);
registryClient.persistEphemeral(heartBeatPath, heartBeatJson);
log.debug("Success write master heartBeatInfo into registry, masterRegistryPath: {}, heartBeatInfo: {}",
heartBeatPath, heartBeatJson);
}
}