来!做一个分钟级业务监控系统 【实战】

2019/7/21 11:27:03 人评论 次浏览 分类:学习教程

  如何做一个实时的业务统计的监控?比如分钟级?也就是每分钟可以快速看到业务的变化趋势,及可以做一些简单的分组查询?

  哎,你可能说很简单了,直接从数据库 count 就可以了! 你是对的。

  但如果不允许你使用db进行count呢?因为线上数据库资源可是很宝贵的哦,你这一count可能会给db带来灾难了。

那不然咋整?

没有db,我们还有其他数据源嘛,比如: 消息队列?埋点数据? 本文将是基于该前提而行。

  

做监控,尽量不要侵入业务太多!所以有一个消息中间件是至关重要的。针对大数据系统,一般是: kafka 或者 类kafka. (如本文基础 loghub)

  有了消息中间件,如何进行分钟级监控? 这个应该就很简单了吧。不过如果要自己实现,其实坑也不少的!

如果自己实现计数,那么你可能需要做以下几件事:

  1. 每消费一个消息,你需要一个累加器;
  2. 每隔一个周期,你可能需要一个归档操作;
  3. 你可能需要考虑各种并发安全问题;
  4. 你可能需要考虑种性能问题;
  5. 你可能需要考虑各种机器故障问题;
  6. 你可能需要考虑各种边界值问题;

  哎,其实没那么难。时间序列数据库,就专门为这类事情而生!如OpenTSDB: http://opentsdb.net/overview.html

  可以说,TSDB 是这类应用场景的杀手锏。或者基于流计算框架: 如flink, 也是很轻松完成的事。但是不是本文的方向,略过!

本文是基于 loghub 的现有数据,进行分钟级统计后,入库 mysql 中,从而支持随时查询。(因loghub每次查询都是要钱的,所以,不可能直接查询)

  loghub 数据结构如: 2019-07-10 10:01:11,billNo,userId,productCode,...

  由于loghub提供了很多强大的查询统计功能,所以我们可以直接使用了。

  核心功能就是一个统计sql,还是比较简单的。但是需要考虑的点也不少,接下来,将为看官们奉上一个完整的解决方案!

撸代码去!

1. 核心统计任务实现类 MinuteBizDataCounterTask

import com.aliyun.openservices.log.Client;  import com.aliyun.openservices.log.common.LogContent;  import com.aliyun.openservices.log.common.LogItem;  import com.aliyun.openservices.log.common.QueriedLog;  import com.aliyun.openservices.log.exception.LogException;  import com.aliyun.openservices.log.response.GetLogsResponse;  import com.my.service.statistics.StatisticsService;  import com.my.entity.BizDataStatisticsMin;  import com.my.model.LoghubQueryCounterOffsetModel;  import com.my.util.loghub.LogHubProperties;  import lombok.extern.slf4j.Slf4j;  import org.springframework.beans.factory.annotation.Value;  import org.springframework.stereotype.Component;    import javax.annotation.Resource;  import java.math.BigDecimal;  import java.time.LocalDateTime;  import java.time.ZoneOffset;  import java.util.ArrayList;  import java.util.List;    /**   * 基于loghub 的分钟级 统计任务   */  @Component  @Slf4j  public class MinuteBizDataCounterTask implements Runnable {        @Resource      private LogHubProperties logHubProperties;        @Resource      private StatisticsService statisticsService;        @Resource(name = "defaultOffsetQueryTaskCallback")      private DefaultOffsetQueryTaskCallbackImpl defaultOffsetQueryTaskCallback;        /**       * loghub 客户端       */      private volatile Client mClient;        /**       * 过滤的topic       */      private static final String LOGHUB_TOPIC = "topic_test";        /**       * 单次扫描loghub最大时间 间隔分钟数       */      @Value("${loghub.offset.counter.perScanMaxMinutesGap}")      private Integer perScanMaxMinutesGap;        /**       * 单次循环最大数       */      @Value("${loghub.offset.counter.perScanMaxRecordsLimit}")      private Integer perScanMaxRecordsLimit;        /**       * 构造必要实例信息       */      public ProposalPolicyBizDataCounterTask() {        }        @Override      public void run() {          if(mClient == null) {              this.mClient = new Client(logHubProperties.getEndpoint(),                                  logHubProperties.getAccessKeyId(), logHubProperties.getAccessKey());          }          while (!Thread.interrupted()) {              try {                  updateLastMinutePolicyNoCounter();                  Thread.sleep(60000);              }              catch (InterruptedException e) {                  log.error("【分钟级统计task】, sleep 中断", e);                  Thread.currentThread().interrupt();              }              catch (Exception e) {                  // 注意此处可能有风险,发生异常后将快速死循环                  log.error("【分钟级统计task】更新异常", e);                  try {                      Thread.sleep(10000);                  }                  catch (InterruptedException ex) {                      log.error("【分钟级统计task】异常,且sleep异常", ex);                      Thread.currentThread().interrupt();                  }              }          }      }        /**       * 更新最近的数据 (分钟级)       *       * @throws LogException loghub查询异常时抛出       */      private void updateLastMinutePolicyNoCounter() throws LogException {          updateMinutePolicyNoCounter(null);      }        /**       * 更新最近的数据       */      public Integer updateMinutePolicyNoCounter(LoghubQueryCounterOffsetModel specifyOffset) throws LogException {          // 1. 获取偏移量          // 2. 根据偏移量,判定是否可以一次性取完,或者多次获取更新          // 3. 从loghub中设置偏移量,获取统计数据,更新          // 4. 更新db数据统计值          // 5. 更新偏移量          // 6. 等待下一次更新            // 指定offset时,可能为补数据          final LoghubQueryCounterOffsetModel destOffset = enhanceQueryOffset(specifyOffset);          initSharedQueryOffset(destOffset, destOffset == specifyOffset);            Integer totalAffectNum = 0;            while (!isScanFinishOnDestination(destOffset)) {              // 完整扫描一次时间周期              calcNextSharedQueryOffset(destOffset);              while (true) {                  calcNextInnerQueryOffset();                  ArrayList logs = queryPerMinuteStatisticFromLoghubOnCurrentOffset();                  Integer affectNum = handleMiniOffsetBatchCounter(logs);                  totalAffectNum += affectNum;                  log.info("【分钟级统计task】本次更新数据:{}, offset:{}", affectNum, getCurrentSharedQueryOffset());                  if(!hasMoreDataOffset(logs.size())) {                      rolloverOffsetAndCommit();                      break;                  }              }          }          log.info("【分钟级统计task】本次更新数据,总共:{}, destOffset:{}, curOffset:{}",                              totalAffectNum, destOffset, getCurrentSharedQueryOffset());          rolloverOffsetAndCommit();          return totalAffectNum;      }        /**       * 处理一小批的统计数据       *       * @param logs 小批统计loghub数据       * @return 影响行数       */      private Integer handleMiniOffsetBatchCounter(ArrayList logs) {          if (logs == null || logs.isEmpty()) {              return 0;          }          List statisticsMinList = new ArrayList<>();          for (QueriedLog log1 : logs) {              LogItem getLogItem = log1.GetLogItem();              BizDataStatisticsMin statisticsMin1 = adaptStatisticsMinDbData(getLogItem);              statisticsMin1.setEventCode(PROPOSAL_FOUR_IN_ONE_TOPIC);              statisticsMin1.setEtlVersion(getCurrentScanTimeDuring() + ":" + statisticsMin1.getStatisticsCount());              statisticsMinList.add(statisticsMin1);          }          return statisticsService.batchUpsertPremiumStatistics(statisticsMinList, getCurrentOffsetCallback());      }          /**       * 获取共享偏移信息       *       * @return 偏移       */      private LoghubQueryCounterOffsetModel getCurrentSharedQueryOffset() {          return defaultOffsetQueryTaskCallback.getCurrentOffset();      }        /**       * 判断本次是否扫描完成       *       * @param destOffset 目标偏移       * @return true:扫描完成, false: 未完成       */      private boolean isScanFinishOnDestination(LoghubQueryCounterOffsetModel destOffset) {          return defaultOffsetQueryTaskCallback.getEndTime() >= destOffset.getEndTime();      }        /**       * 获取偏移提交回调器       *       * @return 回调实例       */      private OffsetQueryTaskCallback getCurrentOffsetCallback() {          return defaultOffsetQueryTaskCallback;      }        /**       * 初始化共享的查询偏移变量       *       * @param destOffset 目标偏移       * @param isSpecifyOffset 是否是手动指定的偏移       */      private void initSharedQueryOffset(LoghubQueryCounterOffsetModel destOffset, boolean isSpecifyOffset) {          // 整分花时间数据          Integer queryStartTime = destOffset.getStartTime();          if(queryStartTime % 60 != 0) {              queryStartTime = queryStartTime / 60 * 60;          }          // 将目标扫描时间终点 设置为起点,以备后续迭代          defaultOffsetQueryTaskCallback.initCurrentOffset(queryStartTime, queryStartTime,                                                          destOffset.getOffsetStart(), destOffset.getLimit(),                                                          destOffset.getIsNewStep(), isSpecifyOffset);          if(defaultOffsetQueryTaskCallback.getIsNewStep()) {              resetOffsetDefaultSettings();          }      }        /**       * 计算下一次统计偏移时间       *       * @param destOffset 目标偏移值       */      private void calcNextSharedQueryOffset(LoghubQueryCounterOffsetModel destOffset) {          int perScanMaxSecondsGap = perScanMaxMinutesGap * 60;          if(destOffset.getEndTime() - defaultOffsetQueryTaskCallback.getStartTime() > perScanMaxSecondsGap) {              defaultOffsetQueryTaskCallback.setStartTime(defaultOffsetQueryTaskCallback.getEndTime());              int nextExpectEndTime = defaultOffsetQueryTaskCallback.getStartTime() + perScanMaxSecondsGap;              if(nextExpectEndTime > destOffset.getEndTime()) {                  nextExpectEndTime = destOffset.getEndTime();              }              defaultOffsetQueryTaskCallback.setEndTime(nextExpectEndTime);          }          else {              defaultOffsetQueryTaskCallback.setStartTime(defaultOffsetQueryTaskCallback.getEndTime());              defaultOffsetQueryTaskCallback.setEndTime(destOffset.getEndTime());          }          resetOffsetDefaultSettings();      }        /**       * 重置偏移默认配置       */      private void resetOffsetDefaultSettings() {          defaultOffsetQueryTaskCallback.setIsNewStep(true);          defaultOffsetQueryTaskCallback.setOffsetStart(0);          defaultOffsetQueryTaskCallback.setLimit(0);      }        /**       * 计算下一次小偏移,此种情况应对 一次外部偏移未查询完成的情况       */      private void calcNextInnerQueryOffset() {          defaultOffsetQueryTaskCallback.setIsNewStep(false);          // 第一次计算时,limit 为0, 所以得出的 offsetStart 也是0          defaultOffsetQueryTaskCallback.setOffsetStart(                  defaultOffsetQueryTaskCallback.getOffsetStart() + defaultOffsetQueryTaskCallback.getLimit());          defaultOffsetQueryTaskCallback.setLimit(perScanMaxRecordsLimit);      }        /**       * 获取当前循环的扫描区间       *       * @return 15567563433-1635345099 区间       */      private String getCurrentScanTimeDuring() {          return defaultOffsetQueryTaskCallback.getStartTime() + "-" + defaultOffsetQueryTaskCallback.getEndTime();      }        /**       * 从loghub查询每分钟的统计信息       *       * @return 查询到的统计信息       * @throws LogException loghub 异常时抛出       */      private ArrayList queryPerMinuteStatisticFromLoghubOnCurrentOffset() throws LogException {          // 先按保单号去重,再进行计数统计          String countSql = "* | split(bizData, ',')[5] policyNo, bizData GROUP by split(bizData, ',')[5] " +                  " | select count(1) as totalCountMin, " +                  "split(bizData, ',')[2] as productCode," +                  "split(bizData, ',')[3] as schemaCode," +                  "split(bizData, ',')[4] as channelCode," +                  "substr(split(bizData, ',')[1], 1, 16) as myDateTimeMinute " +                  "group by substr(split(bizData, ',')[1], 1, 16), split(bizData, ',')[2],split(bizData, ',')[3], split(bizData, ',')[4],split(bizData, ',')[7], split(bizData, ',')[8]";          countSql += " limit " + defaultOffsetQueryTaskCallback.getOffsetStart() + "," + defaultOffsetQueryTaskCallback.getLimit();          GetLogsResponse countResponse = mClient.GetLogs(logHubProperties.getProjectName(), logHubProperties.getBizCoreDataLogStore(),                  defaultOffsetQueryTaskCallback.getStartTime(), defaultOffsetQueryTaskCallback.getEndTime(),                  LOGHUB_TOPIC, countSql);          if(!countResponse.IsCompleted()) {              log.error("【分钟级统计task】扫描获取到未完整的数据,请速检查原因,offSet:{}", getCurrentSharedQueryOffset());          }          return countResponse.GetLogs() == null                      ? new ArrayList<>()                      : countResponse.GetLogs();      }        /**       * 根据上一次返回的记录数量,判断是否还有更多数据       *       * @param lastGotRecordsCount 上次返回的记录数 (数据量大于最大数说明还有未取完数据)       * @return true: 是还有更多数据应该再循环获取, false: 无更多数据结束本期任务       */      private boolean hasMoreDataOffset(int lastGotRecordsCount) {          return lastGotRecordsCount >= perScanMaxRecordsLimit;      }        /**       * 加强版的 offset 优先级: 指定偏移 -> 基于缓存的偏移 -> 新生成偏移标识       *       * @param specifyOffset 指定偏移(如有)       * @return 偏移标识       */      private LoghubQueryCounterOffsetModel enhanceQueryOffset(LoghubQueryCounterOffsetModel specifyOffset) {          if(specifyOffset != null) {              return specifyOffset;          }          LoghubQueryCounterOffsetModel offsetBaseOnCache = getNextOffsetBaseOnCache();          if(offsetBaseOnCache != null) {              return offsetBaseOnCache;          }          return generateNewOffset();      }        /**       * 基于缓存获取一下偏移标识       *       * @return 偏移       */      private LoghubQueryCounterOffsetModel getNextOffsetBaseOnCache() {          LoghubQueryCounterOffsetModel offsetFromCache = defaultOffsetQueryTaskCallback.getCurrentOffsetFromCache();          if(offsetFromCache == null) {              return null;          }          LocalDateTime now = LocalDateTime.now();          LocalDateTime nowMinTime = LocalDateTime.of(now.getYear(), now.getMonth(), now.getDayOfMonth(),                                                      now.getHour(), now.getMinute());          // 如果上次仍未内部循环完成,则使用原来的          if(offsetFromCache.getIsNewStep()) {              offsetFromCache.setStartTime(offsetFromCache.getEndTime());              long endTime = nowMinTime.toEpochSecond(ZoneOffset.of("+8"));              offsetFromCache.setEndTime((int) endTime);          }          return offsetFromCache;      }        /**       * 生成新的完整的 偏移标识       *       * @return 新偏移       */      private LoghubQueryCounterOffsetModel generateNewOffset() {          LoghubQueryCounterOffsetModel offsetNew = new LoghubQueryCounterOffsetModel();          LocalDateTime now = LocalDateTime.now();          LocalDateTime nowMinTime = LocalDateTime.of(now.getYear(), now.getMonth(), now.getDayOfMonth(),                  now.getHour(), now.getMinute());          long startTime = nowMinTime.minusDays(1).toEpochSecond(ZoneOffset.of("+8"));          long endTime = nowMinTime.toEpochSecond(ZoneOffset.of("+8"));          offsetNew.setStartTime((int) startTime);          offsetNew.setEndTime((int) endTime);          return offsetNew;      }      /**       * 将日志返回数据 适配到数据库记录中       *       * @param logItem 日志详情       * @return db数据结构对应       */      private BizDataStatisticsMin adaptStatisticsMinDbData(LogItem logItem) {          ArrayList logContents = logItem.GetLogContents();          BizDataStatisticsMin statisticsMin1 = new BizDataStatisticsMin();          for (LogContent logContent : logContents) {              switch (logContent.GetKey()) {                  case "totalCountMin":                      statisticsMin1.setStatisticsCount(Integer.valueOf(logContent.GetValue()));                      break;                  case "productCode":                      statisticsMin1.setProductCode(logContent.GetValue());                      break;                  case "myDateTimeMinute":                      String signDtMinStr = logContent.GetValue();                      String[] dateTimeArr = signDtMinStr.split(" ");                      String countDate = dateTimeArr[0];                      String[] timeArr = dateTimeArr[1].split(":");                      String countHour = timeArr[0];                      String countMin = timeArr[1];                      statisticsMin1.setCountDate(countDate);                      statisticsMin1.setCountHour(countHour);                      statisticsMin1.setCountMin(countMin);                      break;                  default:                      break;              }          }          return statisticsMin1;      }        /**       * 重置默认值,同时提交当前 (滚动到下一个偏移点)       */      private void rolloverOffsetAndCommit() {          resetOffsetDefaultSettings();          commitOffsetSync();      }        /**       * 提交偏移量       *       */      private void commitOffsetSync() {          defaultOffsetQueryTaskCallback.commit();      }    }

  主要实现逻辑如下:

    1. 每隔一分钟进行一个查询;
    2. 发生异常后,容错继续查询;
    3. 对于一个新统计,默认倒推一天范围进行统计;
    4. 统计时间范围间隔可设置,避免一次查询数量太大,费用太高且查询返回数量有限;
    5. 对于每次小批量查询,支持分布操作,直到取完数据;
    6. 小批量数据完成后,自动提交查询偏移;
    7. 后续查询将基础提交的偏移进行;
    8. 支持断点查询;

2. 偏移提交管理器 OffsetQueryTaskCallback

  主任务中,只管进行数据统计查询,提交偏移操作由其他类进行;

/**   * 普通任务回调接口定义, 考虑到多种类型的统计任务偏移操作方式可能不一,定义一个通用型偏移接口   *   */  public interface OffsetQueryTaskCallback {        /**       * 回调方法入口, 提交偏移       */      public void commit();        /**       * 设置初始化绑定当前偏移(期间不得改变)       *       * @param startTime 偏移开始时间       * @param endTime 偏移结束时间       * @param offsetStart 偏移开始值(分页)       * @param limit 单次取值最大数(分页)       * @param isNewStep 是否是新的查询       * @param isSpecifyOffset 是否是指定的偏移       */      public void initCurrentOffset(Integer startTime, Integer endTime,                                    Integer offsetStart, Integer limit,                                    Boolean isNewStep, Boolean isSpecifyOffset);        /**       * 从当前环境中获取当前偏移信息       *       * @return 偏移变量实例       */      public LoghubQueryCounterOffsetModel getCurrentOffset();    }      import com.alibaba.fastjson.JSONObject;  import com.my.util.constants.RedisKeysConstantEnum;  import com.my.util.redis.RedisPoolUtil;  import com.my.model.LoghubQueryCounterOffsetModel;  import lombok.extern.slf4j.Slf4j;  import org.apache.commons.lang3.StringUtils;  import org.springframework.stereotype.Component;    import javax.annotation.Resource;    /**   * 默认偏移回调实现   *   */  @Component("defaultOffsetQueryTaskCallback")  @Slf4j  public class DefaultOffsetQueryTaskCallbackImpl implements OffsetQueryTaskCallback {        @Resource      private RedisPoolUtil redisPoolUtil;        /**       * 当前偏移信息       */      private ThreadLocal currentOffsetHolder = new ThreadLocal<>();          @Override      public void commit() {          if(!currentOffsetHolder.get().getIsSpecifyOffset()) {              redisPoolUtil.set(RedisKeysConstantEnum.STATISTICS_COUNTER_OFFSET_CACHE_KEY.getRedisKey(),                      JSONObject.toJSONString(currentOffsetHolder.get()));          }      }        @Override      public void initCurrentOffset(Integer startTime, Integer endTime,                                    Integer offsetStart, Integer limit,                                    Boolean isNewStep, Boolean isSpecifyOffset) {          LoghubQueryCounterOffsetModel currentOffset = new LoghubQueryCounterOffsetModel();          currentOffset.setStartTime(startTime);          currentOffset.setEndTime(endTime);          currentOffset.setOffsetStart(offsetStart);          currentOffset.setIsNewStep(isNewStep);          currentOffset.setIsSpecifyOffset(isSpecifyOffset);          currentOffsetHolder.set(currentOffset);      }        @Override      public LoghubQueryCounterOffsetModel getCurrentOffset() {          return currentOffsetHolder.get();      }        /**       * 从缓存中获取当前偏移信息       *       * @return 缓存偏移或者 null       */      public LoghubQueryCounterOffsetModel getCurrentOffsetFromCache() {          String offsetCacheValue = redisPoolUtil.get(RedisKeysConstantEnum.STATISTICS_COUNTER_OFFSET_CACHE_KEY.getRedisKey());          if (StringUtils.isBlank(offsetCacheValue)) {              return null;          }          return JSONObject.parseObject(offsetCacheValue, LoghubQueryCounterOffsetModel.class);      }        public Integer getStartTime() {          return currentOffsetHolder.get().getStartTime();      }        public void setStartTime(Integer startTime) {          currentOffsetHolder.get().setStartTime(startTime);      }        public Integer getEndTime() {          return currentOffsetHolder.get().getEndTime();      }        public void setEndTime(Integer endTime) {          currentOffsetHolder.get().setEndTime(endTime);      }        public Integer getOffsetStart() {          return currentOffsetHolder.get().getOffsetStart();      }        public void setOffsetStart(Integer offsetStart) {          currentOffsetHolder.get().setOffsetStart(offsetStart);      }        public Integer getLimit() {          return currentOffsetHolder.get().getLimit();      }        public void setLimit(Integer limit) {          currentOffsetHolder.get().setLimit(limit);      }        public Boolean getIsNewStep() {          return currentOffsetHolder.get().getIsNewStep();      }        public void setIsNewStep(Boolean isNewStep) {          currentOffsetHolder.get().setIsNewStep(isNewStep);      }    }    /**   * loghub 查询偏移量 数据容器   *   */  @Data  public class LoghubQueryCounterOffsetModel implements Serializable {        private static final long serialVersionUID = -3749552331349228045L;        /**       * 开始时间       */      private Integer startTime;        /**       * 结束时间       */      private Integer endTime;        /**       * 起始偏移       */      private Integer offsetStart = 0;        /**       * 每次查询的 条数限制, 都需要进行设置后才可用, 否则查无数据       */      private Integer limit = 0;        /**       * 是否新的偏移循环,如未完成,应继续子循环 limit       *       * true: 是, offsetStart,limit 失效, false: 否, 需借助 offsetStart,limit 进行limit相加       */      private Boolean isNewStep = true;        /**       * 是否是手动指定的偏移,如果是说明是在手动被数据,偏移量将不会被更新       *       *      此变量是瞬时值,将不会被持久化到偏移标识中       */      private transient Boolean isSpecifyOffset;    }

3. 批量更新统计结果数据库的实现

  因每次统计的数据量是不确定的,因尽可能早的提交一次统计结果,防止一次提交太多,或者 机器故障时所有统计白费,所以需要分小事务进行。

      @Service  public class StatisticsServiceImpl implements StatisticsService {      /**       * 批量更新统计分钟级数据 (事务型提交)       *       * @param statisticsMinList 新统计数据       * @return 影响行数       */      @Transactional(isolation = Isolation.READ_COMMITTED, rollbackFor = Throwable.class)      public Integer batchUpsertPremiumStatistics(List statisticsMinList,              OffsetQueryTaskCallback callback) {          AtomicInteger updateCount = new AtomicInteger(0);          statisticsMinList.forEach(item -> {              int affectNum = 0;              BizProposalPolicyStatisticsMin oldStatistics = bizProposalPolicyStasticsMinMapper.selectOneByCond(item);              if (oldStatistics == null) {                  item.setEtlVersion(item.getEtlVersion() + ":0");                  affectNum = bizProposalPolicyStasticsMinMapper.insert(item);              } else {                  oldStatistics.setStatisticsCount(oldStatistics.getStatisticsCount() + item.getStatisticsCount());                  String versionFull = versionKeeperFilter(oldStatistics.getEtlVersion(), item.getEtlVersion());                  oldStatistics.setEtlVersion(versionFull + ":" + oldStatistics.getStatisticsCount());                  // todo: 优化更新版本号问题                  affectNum = bizProposalPolicyStasticsMinMapper.updateByPrimaryKey(oldStatistics);              }              updateCount.addAndGet(affectNum);          });          callback.commit();          return updateCount.get();      }        /**       * 版本号过滤器(组装版本信息)       *       * @param oldVersion     老版本信息       * @param currentVersion 当前版本号       * @return 可用的版本信息       */      private String versionKeeperFilter(String oldVersion, String currentVersion) {          String versionFull = oldVersion + "," + currentVersion;          if (versionFull.length() >= 500) {              // 从150以后,第一版本号开始保留              versionFull = versionFull.substring(versionFull.indexOf(',', 150));          }          return versionFull;      }    }

4. 你需要一个启动任务的地方

/**   * 启动时运行的任务调度服务   *   */  @Service  @Slf4j  public class TaskAutoRunScheduleService {        @Resource      private MinuteBizDataCounterTask minuteBizDataCounterTask;        @PostConstruct      public void bizDataAutoRun() {          log.info("============= bizDataAutoRun start =================");          ExecutorService executorService = Executors.newSingleThreadExecutor(new NamedThreadFactory("Biz-data-counter-%d"));          executorService.submit(minuteBizDataCounterTask);      }    }

5. 将每分钟的数据从db查询出来展示到页面

  以上将数据统计后以分钟级汇总到数据,接下来,监控页面就只需从db中进行简单聚合就可以了,咱们就不费那精力去展示了。

6. 待完善的地方

  1. 集群环境的任务运行将会出问题,解决办法是:加一个分布式锁即可。 你可以的!

  2. 针对重试执行统计问题,还得再考虑考虑。(幂等性)

唠叨: 踩坑不一定是坏事!

相关资讯

    暂无相关的资讯...

共有访客发表了评论 网友评论

验证码: 看不清楚?
    -->