数据采集分析

发布时间 2023-07-05 16:38:35作者: 做时间的好朋友

数据采集分析

1.名词定义

  • 采集请求CollectRequest:将数据采集封装为采集请求的数据结构
  • 监控项选择器MetricSelector:用来基于不同的资源类型查询出来可以支持的监控项列表
  • 采集调度器Scheduler: 将采集请求分发给不同的实例节点上,实现类可以kafka,通过不同机器消费来实现调度到不同机器上
  • 生产者Producer:生产者,用于封装好采集请求,然后采集调度器去进行调度

2.监控项选择器MetricSelector的设计

selector接口

public interface MetricSelector {

    Set<CollectMetric> select(ResourceType resourceType);

}

实现类

public class MetricSelectorImpl implements MetricSelector {

    public MetricSelectorImpl() {
    }

    @Override
    public Set<CollectMetric> select(ResourceType resourceType) {
        Set<CollectMetric> resourceMetrics = MetadataCache.metricMetadata.getMetrics(resourceType);
        return resourceMetrics;
    }

}

监控项元数据的定义

public class MetricMetadata {

    private Map<ResourceType, Set<CollectMetric>> metric = Maps.newConcurrentMap();

    public MetricMetadata(List<CollectRule> collectRules) {
        init(collectRules);
    }

    private void init(List<CollectRule> collectRules) {
        metric = collectRules.stream()
                .map(CollectMetric::new)
                .collect(Collectors.groupingByConcurrent(CollectMetric::getResourceType, Collectors.toSet()));
    }

    public Set<CollectMetric> getMetrics(ResourceType resourceType) {
        return metric.getOrDefault(resourceType, Sets.newConcurrentHashSet());
    }

}

3.采集调度器设计

调度器接口

public interface Scheduler {

    void into(List<CollectRequest> requests);

    List<CollectRequest> out();
}

kafka实现调度器接口的调度实现

public class KafkaScheduler implements Scheduler {

    private static final Logger logger = LoggerFactory.getLogger(KafkaScheduler.class);

    protected KafkaProducer<String, String> producer;
    protected KafkaConsumer cnsumer;

    protected String schedulerTopic;

    public KafkaScheduler() {
        this.cnsumer = BeanContext.getBean("fetchConsumer");
        this.producer = BeanContext.getBean("stringkafkaProducer");
        Environment environment = BeanContext.getApplicationContext().getEnvironment();
        this.schedulerTopic = environment.getProperty("kafka.topic.scheduler");
    }

    @Override // 把采集请求发送到kafka中
    public void into(List<CollectRequest> requests) {
        for (CollectRequest request : requests) {
            ProducerRecord<String, String> record = new ProducerRecord<>(schedulerTopic, JSON.toJSONString(request));
            try {
                producer.send(record, (metadata, exception) -> Optional.ofNullable(exception).ifPresent(ex -> logger.error("send kafka failed", ex)));
            } catch (Exception e) {
             logger.error("kafka 发送异常",e);
            }
        }
    }

    @Override // 从kafka中取出采集请求
    public List<CollectRequest> out() {
        List<CollectRequest> requests = Lists.newArrayList();
        ConsumerRecords<String, String> records = cnsumer.poll(Duration.ofSeconds(10));
        for (ConsumerRecord<String, String> record : records) {
            String value = record.value();
            requests.add(JSON.parseObject(value, CollectRequest.class));
        }
        return requests;
    }

}

4.Producer的设计

Producer接口的设计

public interface Producer {

    void produce();
}

抽象类AbstractProducer

public abstract class AbstractProducer implements Producer {

    private static final Logger logger = LoggerFactory.getLogger(AbstractProducer.class);

    private static final String UMP_PRODUCER_KEY = "delta.fetch.producer";

    private static final String fetchProduceRequestCountKey = "delta.fetch.bizMonitor.produceRequestCount";
    private static final String fetchProduceResourceCountKey = "delta.fetch.bizMonitor.produceResourceCount";
	// 监控项选择器
    private final MetricSelector selector = new MetricSelectorImpl();
	// 调度器,将采集请求发到哪里
    private final Scheduler scheduler;

    private final RedisUtils4Jedis jedis;

    public AbstractProducer() {
        this.jedis = BeanContext.getBean(RedisUtils4Jedis.class);
        this.scheduler = new KafkaScheduler();
    }

    public abstract ResourceType resourceType();
	// redis锁名字
    public abstract String getRedisKey();

    /***
     * 基于资源类型获取要采集的监控项列表
     * @return {"serviceCode": {"origin.metric": "format.metric"}}
     */
    public Set<CollectMetric> getMetrics(ResourceType resourceType) {
        return selector.select(resourceType);
    }
    // 获取采集请求
    public Map<ProductionInfo, List<CollectRequest>> getRequest(Set<CollectResource> resources, Set<CollectMetric> metricSet) {
        throw new RuntimeException();
    }
	// 限速器,按照一定速率往调度器里放,防止压力太大。后面调用崩
    public abstract RateLimiter getRateLimiter();

    @Override
    public void produce() {
        RLock lock = jedis.getLock(getRedisKey());
        RateLimiter rateLimiter = getRateLimiter();
        CallerInfo info = null;
        try {
            info = Profiler.registerInfo(getUmpKey());
            boolean b = lock.tryLock();
            if (b) {
				// 获取采集的监控时间段范围
                long targetTime = getTargetTime();
                ResourceType resourceType = resourceType();
				// 后去要采集的资源列表
                Set<CollectResource> resourceSet = getResource(resourceType);
                logger.info("[Producer] sourceType : {} , size : {}", resourceType(), resourceSet.size());
				// ProductionInfo 对象没有业务用途,单纯为了业务监控
                Map<ProductionInfo, List<CollectRequest>> requestList = getRequest(resourceSet, getMetrics(resourceType));
                for (Map.Entry<ProductionInfo, List<CollectRequest>> productionInfoListEntry : requestList.entrySet()) {
                    ProductionInfo productionInfo = productionInfoListEntry.getKey();
                    List<CollectRequest> collectRequests = productionInfoListEntry.getValue();
                    AtomicInteger sendCount = new AtomicInteger(0);
                    for (CollectRequest request : collectRequests) {
                        rateLimiter.acquire();
                        request.setTargetTime(targetTime);
                        request.setProduceTime(System.currentTimeMillis());
						.// 将采集请求分发到kafka中
                        scheduler.into(Lists.newArrayList(request));
						// 发送技术自+1
                        sendCount.incrementAndGet();
                    }
                    Map<String, Number> requestCountMap = new HashMap<>();
                    requestCountMap.put(productionInfo.getSubType(), sendCount.get());
                    Profiler.sourceDataByNum(fetchProduceRequestCountKey, requestCountMap);

                    Map<String, Number> resourceTotalMap = new HashMap<>();
                    resourceTotalMap.put(productionInfo.getSubType(), productionInfo.getSubTypeCount());

                    Profiler.sourceDataByNum(fetchProduceResourceCountKey, resourceTotalMap);
                    logger.info("[Producer] end , sourceType : {} , resource size:{}, send count: {} ", productionInfo.getSubType(), productionInfo.getSubTypeCount(), sendCount.get());
                }
            }
        } catch (Exception e) {
            logger.error("[Producer] error, ", e);
            Profiler.functionError(info);
        } finally {
            if (lock.isLocked() && lock.isHeldByCurrentThread()) {
                lock.unlock();
            }
            Profiler.registerInfoEnd(info);
        }
    }

    private String getUmpKey() {
        return UMP_PRODUCER_KEY;
    }

    protected Set<CollectResource> getResource(ResourceType resourceType) {
        return MetadataCache.resourceMetadata.getResource(resourceType);
    }

    protected long getTargetTime() {
        return DateUtils.getBeforeUTCMinutes(0);
    }

}

MysqlHostProducer的生产者(实现类)

/**
 * 自建数据库监控数据
 * from ; dba zabbix

 */
public class MysqlHostProducer extends AbstractProducer {

    private static final ResourceType type = ResourceType.MYSQL_HOST;

    private static final String KEY = "mysqlHostResourceWriter_v2.2";

    private static final String COLLECTOR_KEY = "zabbixCollector";

    @Override
    public ResourceType resourceType() {
        return type;
    }

	// 基于资源和监控项生成采集请求
    @Override
    public Map<ProductionInfo, List<CollectRequest>> getRequest(Set<CollectResource> resources, Set<CollectMetric> metrics) {
        List<List<CollectResource>> partition = Lists.partition(Lists.newArrayList(resources), 10);
        Map<ProductionInfo, List<CollectRequest>> resultMap = Maps.newConcurrentMap();
        List<CollectRequest> collectRequests = partition.stream()
                .map(collectResources -> createCollectRequest(collectResources, metrics, COLLECTOR_KEY))
                .collect(Collectors.toList());
        ProductionInfo productionInfo = new ProductionInfo();
        productionInfo.setResourceType(resourceType());
        productionInfo.setResourceTotal(resources.size());
        productionInfo.setSubTypeCount(collectRequests.size());
        productionInfo.setSubType(resourceType().name());
        resultMap.put(productionInfo, collectRequests);
        return resultMap;
    }

    protected CollectRequest createCollectRequest(List<CollectResource> resources, Set<CollectMetric> metric, String collector) {
        Set<String> ips = resources.stream()
                .map(CollectResource::getResourceId)
                .collect(Collectors.toSet());
        CollectRequest collectRequest = new CollectRequest();
        ZabbixRequest zabbixRequest = new ZabbixRequest(collectRequest);
        zabbixRequest.setIps(ips);
        zabbixRequest.setMetrics(metric);
        zabbixRequest.setCollector(collector);
        zabbixRequest.setEndTime(String.valueOf(DateUtils.getBeforeUTCSecond(0)));
        zabbixRequest.setStartTime(String.valueOf(DateUtils.getBeforeUTCSecond(-1)));
        zabbixRequest.setMetricSourceEnum(MetricSourceEnum.ZABBIX);
        return collectRequest;
    }

    @Override
    public String getRedisKey() {
        return KEY;
    }

    @Override
    public RateLimiter getRateLimiter() {
        return RateLimiter.create(40.0);
    }


    @Override
    protected long getTargetTime() {
        return delta.fetch.utils.DateUtils.getBeforeUTCMinutes(-1);
    }

}