项目介绍
核心功能:统一的接口发送各种类型消息,对消息生命周期全链路追踪。
意义:只要公司内部有发送消息的需求,都应该要有类似austin
的项目。消息推送平台对各类消息进行统一发送处理,这有利于对功能的收拢,以及提高业务需求开发的效率。
项目地址:https://github.com/ZhongFuCheng3y/austin
项目拆解
下发消息接口,分为群发和单发。接口参数主要有模板id(发送消息的内容模板),参数[用来替换模板参数,接收人],api可以存在多个,但是具体处理的方法只需要批量参数就可以。
单发的接口请求实体类
public class SendRequest {/*** 执行业务类型* send:发送消息* recall:撤回消息*/private String code;/*** 消息模板Id* 【必填】*/private Long messageTemplateId;/*** 消息相关的参数* 当业务类型为"send",必传*/private MessageParam messageParam;}
群发的接口实体类
public class BatchSendRequest {/*** 执行业务类型* 必传,参考 BusinessCode枚举*/private String code;/*** 消息模板Id* 必传*/private Long messageTemplateId;/*** 消息相关的参数* 必传*/private List<MessageParam> messageParamList;}
单个消息的实体
public class MessageParam {/*** @Description: 接收者* 多个用,逗号号分隔开* 【不能大于100个】* 必传*/private String receiver;/*** @Description: 消息内容中的可变部分(占位符替换)* 可选*/private Map<String, String> variables;/*** @Description: 扩展参数* 可选*/private Map<String, String> extra;
}
消息模板中定义了发送渠道,消息渠道决定消息的处理器。
在AssembleAction
转换实体类。核心逻辑有
- 将模板中的可变参数转化成文本内容
- 根据消息渠道获取消息实体类
- 生成业务编码
AssembleAction#getContentModelValue
,ContentHolderUtil.replacePlaceHolder(originValue, variables);
用来处理可替换变量。
private static ContentModel getContentModelValue(MessageTemplate messageTemplate, MessageParam messageParam) {// 得到真正的ContentModel 类型Integer sendChannel = messageTemplate.getSendChannel();Class<? extends ContentModel> contentModelClass = ChannelType.getChanelModelClassByCode(sendChannel);// 得到模板的 msgContent 和 入参Map<String, String> variables = messageParam.getVariables();JSONObject jsonObject = JSON.parseObject(messageTemplate.getMsgContent());// 通过反射 组装出 contentModelField[] fields = ReflectUtil.getFields(contentModelClass);ContentModel contentModel = ReflectUtil.newInstance(contentModelClass);for (Field field : fields) {String originValue = jsonObject.getString(field.getName());if (StrUtil.isNotBlank(originValue)) {String resultValue = ContentHolderUtil.replacePlaceHolder(originValue, variables);Object resultObj = JSONUtil.isJsonObj(resultValue) ? JSONUtil.toBean(resultValue, field.getType()) : resultValue;ReflectUtil.setFieldValue(contentModel, field, resultObj);}}// 如果 url 字段存在,则在url拼接对应的埋点参数String url = (String) ReflectUtil.getFieldValue(contentModel, LINK_NAME);if (StrUtil.isNotBlank(url)) {String resultUrl = TaskInfoUtils.generateUrl(url, messageTemplate.getId(), messageTemplate.getTemplateType());ReflectUtil.setFieldValue(contentModel, LINK_NAME, resultUrl);}return contentModel;
}
在SendMqAction
负责发送消息,根据austin.mq.pipeline
的值获取不同的消息发送者,有MQ发消息,有EventBus,有SpringEvent。
@Slf4j
@Service
public class SendMqAction implements BusinessProcess<SendTaskModel> {@Autowiredprivate SendMqService sendMqService;@Value("${austin.business.topic.name}")private String sendMessageTopic;@Value("${austin.business.recall.topic.name}")private String austinRecall;@Value("${austin.business.tagId.value}")private String tagId;@Value("${austin.mq.pipeline}")private String mqPipeline;@Overridepublic void process(ProcessContext<SendTaskModel> context) {SendTaskModel sendTaskModel = context.getProcessModel();try {if (BusinessCode.COMMON_SEND.getCode().equals(context.getCode())) {String message = JSON.toJSONString(sendTaskModel.getTaskInfo(), new SerializerFeature[]{SerializerFeature.WriteClassName});sendMqService.send(sendMessageTopic, message, tagId);} else if (BusinessCode.RECALL.getCode().equals(context.getCode())) {String message = JSON.toJSONString(sendTaskModel.getMessageTemplate(), new SerializerFeature[]{SerializerFeature.WriteClassName});sendMqService.send(austinRecall, message, tagId);}} catch (Exception e) {context.setNeedBreak(true).setResponse(BasicResultVO.fail(RespStatusEnum.SERVICE_ERROR));log.error("send {} fail! e:{},params:{}", mqPipeline, Throwables.getStackTraceAsString(e), JSON.toJSONString(CollUtil.getFirst(sendTaskModel.getTaskInfo().listIterator())));}}}
ConsumeServiceImpl#consume2Send
,负责处理消息
@Overridepublic void consume2Send(List<TaskInfo> taskInfoLists) {String topicGroupId = GroupIdMappingUtils.getGroupIdByTaskInfo(CollUtil.getFirst(taskInfoLists.iterator()));for (TaskInfo taskInfo : taskInfoLists) {logUtils.print(LogParam.builder().bizType(LOG_BIZ_TYPE).object(taskInfo).build(), AnchorInfo.builder().ids(taskInfo.getReceiver()).businessId(taskInfo.getBusinessId()).state(AnchorState.RECEIVE.getCode()).build());Task task = context.getBean(Task.class).setTaskInfo(taskInfo);taskPendingHolder.route(topicGroupId).execute(task);}}
Task#run
,负责任务的具体处理
@Overridepublic void run() {// 0. 丢弃消息if (discardMessageService.isDiscard(taskInfo)) {return;}// 1. 屏蔽消息shieldService.shield(taskInfo);// 2.平台通用去重if (CollUtil.isNotEmpty(taskInfo.getReceiver())) {deduplicationRuleService.duplication(taskInfo);}// 3. 真正发送消息if (CollUtil.isNotEmpty(taskInfo.getReceiver())) {handlerHolder.route(taskInfo.getSendChannel()).doHandler(taskInfo);}}
EmailHandler#handler
,邮件处理器专门处理邮件的消息
@Overridepublic boolean handler(TaskInfo taskInfo) {EmailContentModel emailContentModel = (EmailContentModel) taskInfo.getContentModel();MailAccount account = getAccountConfig(taskInfo.getSendAccount());try {File file = StrUtil.isNotBlank(emailContentModel.getUrl()) ? AustinFileUtils.getRemoteUrl2File(dataPath, emailContentModel.getUrl()) : null;String result = Objects.isNull(file) ? MailUtil.send(account, taskInfo.getReceiver(), emailContentModel.getTitle(), emailContentModel.getContent(), true) :MailUtil.send(account, taskInfo.getReceiver(), emailContentModel.getTitle(), emailContentModel.getContent(), true, file);} catch (Exception e) {log.error("EmailHandler#handler fail!{},params:{}", Throwables.getStackTraceAsString(e), taskInfo);return false;}return true;}
ShieldServiceImpl#shield
,发送消息判断是否需要白天屏蔽。将消息存储到Redis中,开启xxl-job从redis中获取数据。
@Overridepublic void shield(TaskInfo taskInfo) {if (ShieldType.NIGHT_NO_SHIELD.getCode().equals(taskInfo.getShieldType())) {return;}/*** example:当消息下发至austin平台时,已经是凌晨1点,业务希望此类消息在次日的早上9点推送* (配合 分布式任务定时任务框架搞掂)*/if (isNight()) {if (ShieldType.NIGHT_SHIELD.getCode().equals(taskInfo.getShieldType())) {logUtils.print(AnchorInfo.builder().state(AnchorState.NIGHT_SHIELD.getCode()).businessId(taskInfo.getBusinessId()).ids(taskInfo.getReceiver()).build());}if (ShieldType.NIGHT_SHIELD_BUT_NEXT_DAY_SEND.getCode().equals(taskInfo.getShieldType())) {redisUtils.lPush(NIGHT_SHIELD_BUT_NEXT_DAY_SEND_KEY, JSON.toJSONString(taskInfo,SerializerFeature.WriteClassName),(DateUtil.offsetDay(new Date(), 1).getTime() / 1000) - DateUtil.currentSeconds());logUtils.print(AnchorInfo.builder().state(AnchorState.NIGHT_SHIELD_NEXT_SEND.getCode()).businessId(taskInfo.getBusinessId()).ids(taskInfo.getReceiver()).build());}taskInfo.setReceiver(new HashSet<>());}}/*** 小时 < 8 默认就认为是凌晨(夜晚)** @return*/private boolean isNight() {return LocalDateTime.now().getHour() < 8;}
总结一下
- 下发消息接口,分为群发和单发。接口参数主要有模板id(发送消息的内容模板),模板中定义了消息渠道(消息类型决定消息的处理器),参数[用来替换模板参数,接收人],api可以存在多个,但是具体处理的方法只需要批量参数就可以。
- 根据模板组装数据,替换变量。
- 消息发送,使用监听器或者消息队列,进行异步解耦。消息发送的业务和消息接收的业务拆解开来。
- 根据发送渠道获取对应的消息处理器(用邮件还是短信),使用策略模式进行不同的消息渠道拆分,用具体的消息处理器进行处理消息。