HeartBeatTask发送心跳的后台线程相关设计

发布时间 2023-07-10 10:37:38作者: 做时间的好朋友

1.封装后台线程BaseDaemonThread

public abstract class BaseDaemonThread extends Thread {

    protected BaseDaemonThread(Runnable runnable) {
        super(runnable);
        this.setDaemon(true);
    }

    protected BaseDaemonThread(String threadName) {
        super();
        this.setName(threadName);
        this.setDaemon(true);
    }

}

2.基于BaseDaemonThread设计基础BaseHeartBeatTask设计

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.dolphinscheduler.common.model;

import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public abstract class BaseHeartBeatTask<T> extends BaseDaemonThread {

    private final String threadName;
    private final long heartBeatInterval;

    protected boolean runningFlag;

    public BaseHeartBeatTask(String threadName, long heartBeatInterval) {
        super(threadName);
        this.threadName = threadName;
        this.heartBeatInterval = heartBeatInterval;
        this.runningFlag = true;
    }

    @Override
    public synchronized void start() {
        log.info("Starting {}...", threadName);
        super.start();
        log.info("Started {}, heartBeatInterval: {}...", threadName, heartBeatInterval);
    }

    @Override
    public void run() {
        while (runningFlag) {
            try {
                if (!ServerLifeCycleManager.isRunning()) {
                    log.info("The current server status is {}, will not write heartBeatInfo into registry",
                            ServerLifeCycleManager.getServerStatus());
                    continue;
                }
                T heartBeat = getHeartBeat();
                writeHeartBeat(heartBeat);
            } catch (Exception ex) {
                log.error("{} task execute failed", threadName, ex);
            } finally {
                try {
                    Thread.sleep(heartBeatInterval);
                } catch (InterruptedException e) {
                    handleInterruptException(e);
                }
            }
        }
    }

    public void shutdown() {
        runningFlag = false;
        log.warn("{} finished...", threadName);
    }

    private void handleInterruptException(InterruptedException ex) {
        log.warn("{} has been interrupted", threadName, ex);
        Thread.currentThread().interrupt();
    }

    public abstract T getHeartBeat();

    public abstract void writeHeartBeat(T heartBeat);
}

3.告警心跳Task的设计AlertHeartbeatTask

继承BaseHeartBeatTask,并用了父类BaseHeartBeatTask的方法

package org.apache.dolphinscheduler.alert.registry;

import org.apache.dolphinscheduler.alert.config.AlertConfig;
import org.apache.dolphinscheduler.common.model.AlertServerHeartBeat;
import org.apache.dolphinscheduler.common.model.BaseHeartBeatTask;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.registry.api.RegistryClient;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;

import lombok.extern.slf4j.Slf4j;

import org.springframework.stereotype.Component;

@Slf4j
@Component
public class AlertHeartbeatTask extends BaseHeartBeatTask<AlertServerHeartBeat> {

    private final AlertConfig alertConfig;
    private final Integer processId;
    private final RegistryClient registryClient;
    private final String heartBeatPath;
    private final long startupTime;

    public AlertHeartbeatTask(AlertConfig alertConfig,
                              RegistryClient registryClient) {
        super("AlertHeartbeatTask", alertConfig.getHeartbeatInterval().toMillis());
        this.startupTime = System.currentTimeMillis();
        this.alertConfig = alertConfig;
        this.registryClient = registryClient;
        this.heartBeatPath =
                RegistryNodeType.ALERT_SERVER.getRegistryPath() + "/" + alertConfig.getAlertServerAddress();
        this.processId = OSUtils.getProcessID();
    }

    @Override
    public AlertServerHeartBeat getHeartBeat() {
        return AlertServerHeartBeat.builder()
                .processId(processId)
                .startupTime(startupTime)
                .reportTime(System.currentTimeMillis())
                .cpuUsage(OSUtils.cpuUsagePercentage())
                .memoryUsage(OSUtils.memoryUsagePercentage())
                .availablePhysicalMemorySize(OSUtils.availablePhysicalMemorySize())
                .alertServerAddress(alertConfig.getAlertServerAddress())
                .build();
    }

    @Override
    public void writeHeartBeat(AlertServerHeartBeat heartBeat) {
        String heartBeatJson = JSONUtils.toJsonString(heartBeat);
        registryClient.persistEphemeral(heartBeatPath, heartBeatJson);
        log.debug("Success write master heartBeatInfo into registry, masterRegistryPath: {}, heartBeatInfo: {}",
                heartBeatPath, heartBeatJson);
    }
}