Spring boot装载模板代码
- 涉及的子模块及准备
- 省心Clickhouse批量写
- JSON多层级数据自动映射值
- 模板代码生成及移交控制权给Spring IOC
涉及的子模块及准备
最近比较有空,之前一直好奇,提交到线上考试的代码是如何执行测试的,在实现了基础的demo后,进一步希望加载到Spring上支持动态执行! 经过一段时间琢磨之后,终于完成了基础版本,其中也结合了近来自己封装的消组件,省心Clickhouse批量写、JSON多层级数据自动映射值。
省心Clickhouse批量写
在刚刚使用Clickhouse大批量写入时,经常会出现Clickhouse机器cpu飙升导致查询不可用等情况,在研究了Clickhouse JDBC的官网说明文档以及Clickhouse文档(链接: CK官方文档)后,明白Clickhouse写入方式支持多种,直接使用MyBatis-Plus的批量写入方式就会出现cpu问题,还有一种格式(结构化写入,类似于HBase文件块写入),但是需要知道表字段和类型;
// 消耗比较小的批量写入方式
insert into table_name select %s from input('%s');
第三种写入方式是基于JSON格式(csv等都支持,参考Ck文档的Formats for Input and Output Data部分)写入,Clickhouse会自动匹配表字段和数据的键值对,非空字段严格校验之外,其他字段都会根据数据填充对应的值。
注意:不能做数据更新,只做插入处理,忽略已存在的排序主键数据,更新数据混杂在新增数据时只执行插入数据
//
INSERT INTO afanti_aweme_info_all FORMAT JSONEachRow JSON1\nJSON2
目前只实现第二种写入方式,基本做到了自动生成input 字段及其格式,关键代码如下:
public interface MyIService<T> extends IService<T> {void saveBatchRecordsByInput(List<T> records) throws Exception;}
public class MyServiceImpl<M extends MyBaseMapper<T>, T> extends ServiceImpl<M , T> implements MyIService<T> {protected Log log = LogFactory.getLog(this.getClass());protected Class<M> mapperClass = this.currentMapperClass();protected Class<T> entityClass = this.currentModelClass();@Autowiredprivate SqlSessionTemplate sqlSessionTemplate;@Autowiredprotected M myBaseMapper;protected Logger LOGGER = LoggerFactory.getLogger(this.getClass());private final ConcurrentHashMap<String, String> sqlMap = new ConcurrentHashMap<>();private final ConcurrentHashMap<String, List<String>> columnMap = new ConcurrentHashMap<>();@Overridepublic void saveBatchRecordsByInput(List<T> records) throws Exception {String key = this.mapperClass.getSimpleName() + ".saveBatchRecordsByInput";TableName tableNameAnn = this.entityClass.getAnnotation(TableName.class);// _all结束代表ck的分布式表,此处需要获取本地表的字段和类型String tableName = tableNameAnn.value().endsWith("_all") ? tableNameAnn.value().substring(0, tableNameAnn.value().length() - 4) : tableNameAnn.value();if (!sqlMap.containsKey(key)) {buildSql(tableName, key);}String mapperSql = sqlMap.get(key);Connection connection = getConnection();PreparedStatement ps = connection.prepareStatement(mapperSql);try {List<String> fieldNames = columnMap.get(key);for (T record : records) {int i = 1;for (String column : fieldNames) {Field field;try {field = entityClass.getDeclaredField(column);} catch (NoSuchFieldException e) {field = entityClass.getSuperclass().getDeclaredField(column);}field.setAccessible(true);Object val = field.get(record);// 数组类型处理if (val instanceof String[]) {ps.setArray(i, connection.createArrayOf("String", (String[]) val));} else if (val instanceof Long[]) {ps.setArray(i, connection.createArrayOf("Long", (Long[]) val));} else if (val instanceof Integer[]) {ps.setArray(i, connection.createArrayOf("Integer", (Integer[]) val));} else if (val instanceof Date) {// 特殊字段格式处理if ("statisticsDay".equals(column)) {ps.setObject(i, DateUtils.format((Date) val, DateUtils.SDF_YYYY_MM_DD));} else {ps.setObject(i, DateUtils.format((Date) val));}} else {ps.setObject(i, val);}i++;}ps.addBatch();}ps.executeBatch();ps.clearBatch();} finally {connection.close();}}private synchronized void buildSql(String tableName, String key) throws SQLException {Connection connection = getConnection();PreparedStatement ps = connection.prepareStatement(String.format(query, tableName, "rawdata"));ResultSet set = ps.executeQuery();StringJoiner columns = new StringJoiner(",", "", "");StringJoiner columnAndTypes = new StringJoiner(",", "", "");List<String> columsList = new ArrayList<>();while (set.next()) {String column = set.getString("col_name");String dataType = set.getString("data_type");columns.add(column);columnAndTypes.add(column + " " + dataType);columsList.add(toHumpString(column));}connection.close();// 写入数据时需要写入到分布式表,分布式表根据分布式键规则把数据分发到对应机器的本地表上存储String querySt = String.format(querySQL, tableName + "_all", columns.toString(), columnAndTypes.toString());sqlMap.putIfAbsent(key, querySt);columnMap.putIfAbsent(key, columsList);}private static String toHumpString(String string) {StringBuilder stringBuilder = new StringBuilder();String[] str = string.split("_");for (String string2 : str) {if(stringBuilder.length() == 0){stringBuilder.append(string2);}else {stringBuilder.append(string2.substring(0, 1).toUpperCase());stringBuilder.append(string2.substring(1));}}return stringBuilder.toString();}String querySQL = "insert into %s select %s from input('%s') ";// 获取连接池中的链接public Connection getConnection() {Connection conn = null;try {SqlSession sqlSession = sqlSessionTemplate.getSqlSessionFactory().openSession();conn = sqlSession.getConfiguration().getEnvironment().getDataSource().getConnection();} catch (Exception e) {LOGGER.error("Clickhouse getConnection:{}", e.getMessage());e.printStackTrace();}return conn;}String query = "select name as col_name, `type` as data_type from system.columns where table = '%s' and database = '%s' order by position asc";
}
扩展一下Mybatis-plus的模板方法并重写一段生成input的字段和类型字符串逻辑即可。
JSON多层级数据自动映射值
封装了Json-path包,主要内容录如下
<dependency><groupId>com.jayway.jsonpath</groupId><artifactId>json-path</artifactId><version>2.6.0</version></dependency>
封装未处理格式属性,需要自行处理,代码如下:
public class JsonPathParseUtil {public static Configuration configuration = Configuration.builder().options(Option.DEFAULT_PATH_LEAF_TO_NULL, Option.SUPPRESS_EXCEPTIONS).build();public static Date parseDate(String dateStr) {String parsedDate;if (dateStr.length() == 10) {parsedDate = "yyyy-MM-dd";} else {parsedDate = "yyyy-MM-dd HH:mm:ss";}try {return org.apache.commons.lang3.time.DateUtils.parseDate(dateStr, parsedDate);} catch (ParseException e) {e.printStackTrace();}return null;}public static <T> T Json2DTO(String msgString, Class<T> clazz) {try {T dto = clazz.newInstance();ReadContext ctx = JsonPath.parse(msgString, configuration);Field[] fields = clazz.getDeclaredFields();for (Field field : fields) {JPath path = field.getAnnotation(JPath.class);if (path != null) {Object obj = ctx.read(path.value());field.setAccessible(true);if (obj != null) {if (obj instanceof String) {String value = (String) obj;if (StrUtil.isBlank(value)) {field.set(dto, null);} else if (field.getType().equals(Long.class)) {field.set(dto, Long.parseLong(value));} else if (field.getType().equals(Integer.class)) {field.set(dto, Integer.parseInt(value));} else if (field.getType().equals(Date.class)) {field.set(dto, parseDate(value));} else {field.set(dto, value);}} else if (obj instanceof Map) {Map value = (Map) obj;Object v = JSONObject.toJavaObject(new JSONObject(value), field.getType());field.set(dto, v);} else if (obj instanceof Integer) {Integer value = (Integer) obj;if (field.getType().equals(Long.class)) {field.set(dto, value.longValue());} else if (field.getType().equals(Date.class)) {if (value.toString().length() == 10) {field.set(dto, new Date(value.longValue() * 1000));} else if (value.toString().length() == 13) {field.set(dto, new Date(value.longValue()));}} else if (field.getType().equals(String.class)) {field.set(dto, value.toString());} else if (field.getType().equals(Float.class)) {field.set(dto, Float.valueOf(value));} else if (field.getType().equals(Double.class)) {field.set(dto, Double.valueOf(value));} else {field.set(dto, value);}} else if (obj instanceof Long) {Long value = (Long) obj;if (field.getType().equals(Integer.class)) {field.set(dto, value.intValue());} else if (field.getType().equals(Date.class)) {if (value.toString().length() == 10) {field.set(dto, new Date(value * 1000));} else if (value.toString().length() == 13) {field.set(dto, new Date(value));}} else if (field.getType().equals(String.class)) {field.set(dto, value.toString());} else {field.set(dto, value);}} else if (obj instanceof Double) {Double value = (Double) obj;if (field.getType().equals(String.class)) {field.set(dto, value.toString());} else if (field.getType().equals(Float.class)) {field.set(dto, value.floatValue());} else {field.set(dto, value);}} else if (obj instanceof Date) {Date value = (Date) obj;field.set(dto, value);} else if (obj instanceof JSONArray) {JSONArray value = (JSONArray) obj;if (field.getType().equals(String.class)) {field.set(dto, value.toJSONString());} else {Type genericType = field.getGenericType();if (genericType instanceof ParameterizedType) {ParameterizedType pt = (ParameterizedType) genericType;// 得到泛型里的class类型对象Class<?> actualTypeArgument = (Class<?>) pt.getActualTypeArguments()[0];List values = com.alibaba.fastjson.JSONArray.parseArray(value.toJSONString(), actualTypeArgument);field.set(dto, values);} else {field.set(dto, value);}}} else if (obj instanceof List) {List value = (List) obj;field.set(dto, value);} else if (obj instanceof JSONObject) {JSONObject value = (JSONObject) obj;if (field.getType().equals(String.class)) {field.set(dto, value.toJSONString());} else {field.set(dto, value);}} else if (obj instanceof Boolean) {Boolean value = (Boolean) obj;if (field.getType().equals(Integer.class)) {field.set(dto, value ? 1 : 0);} else {field.set(dto, value);}} else if (obj instanceof BigDecimal) {BigDecimal value = (BigDecimal) obj;if (field.getType().equals(String.class)) {field.set(dto, value.toString());} else if (field.getType().equals(Double.class)) {field.set(dto, value.doubleValue());} else if (field.getType().equals(BigDecimal.class)) {field.set(dto, value);} else if (field.getType().equals(Float.class)) {field.set(dto, value.floatValue());}}}}}return dto;} catch (IllegalAccessException | InstantiationException iae) {iae.printStackTrace();}return null;}}
@JPath
@Target({ElementType.FIELD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface JPath {String value() default "";String format() default "";}
模板代码生成及移交控制权给Spring IOC
关键代码如下,用模板代码生成对应处理逻辑的代码的字符串,包装成java运行中的内存文件,获取编译器,把内存文件表示的数据加载到编译任务队列,执行编译,返回Class对象,到这一步,就是编程线上代码考试的逻辑,你提交自己的代码到远程服务器上编译,用已经准备好的测试数据反射执行你的方法验证代码逻辑符不符合变成要求。
接上一步,获取Spring运行环境上下文,BeanDefinitionBuilder加载Class类,配置初始化设置,设置Bean的名称,注册BeanDefinition,通过ApplicationContext以及Bean的名称调用Bean即可。
@Data
@Component
@ConfigurationProperties("rocket-config")
@Slf4j
public class CodeRunner implements CommandLineRunner {private List<Mq> mq;private final String LISTENER_CODE ="import com.aliyun.openservices.ons.api.Action;\n" +"import com.aliyun.openservices.ons.api.ConsumeContext;\n" +"import com.aliyun.openservices.ons.api.Message;\n" +"import com.aliyun.openservices.ons.api.MessageListener;\n" +"import com.afanticar.transform.util.JsonPathParseUtil;\n" +"import com.afanti.datastreamline.utils.SpringUtils;\n" +"import java.util.ArrayList;\n" +"import java.util.List;\n" +"import org.slf4j.Logger;\n" +"import org.slf4j.LoggerFactory;\n" +"import java.util.Date;\n" +"import java.sql.SQLException;\n" +"import java.lang.reflect.Field;\n" +"import com.afanti.datastreamline.service.AfantiService;\n" +"\n" +"@SuppressWarnings(\"unchecked\")" +"public class AfantiDouyinDataMessageListener implements MessageListener {\n" +"\n" +" private final Logger LOGGER = LoggerFactory.getLogger(this.getClass());\n" +"\n" +" @Override\n" +" public Action consume(Message message, ConsumeContext context) {\n" +" String msgString = new String(message.getBody());\n" +" \n" +" System.out.println(\"接收到消息\" + msgString);\n" +" Object afantiDouyinAwemeInfo = SpringUtils.getBean(\"afantiDouyinAwemeInfo\");\n" +" afantiDouyinAwemeInfo = JsonPathParseUtil.Json2DTO(msgString, afantiDouyinAwemeInfo.getClass());\n" +" try {\n" +" Field field = afantiDouyinAwemeInfo.getClass().getDeclaredField(\"ctime\");\n" +" field.setAccessible(true);\n" +" field.set(afantiDouyinAwemeInfo, new Date());\n" +" } catch (NoSuchFieldException | IllegalAccessException e) {\n" +" e.printStackTrace();\n" +" }" +" List records = new ArrayList();\n" +" records.add(afantiDouyinAwemeInfo);\n" +" AfantiService afantiService = (AfantiService) SpringUtils.getBean(\"afantiService\");\n" +" try {\n" +" afantiService.saveBatchRecordsByInput(\"afanti_aweme_info_all\", records);\n" +" } catch (SQLException exception) {\n" +" exception.printStackTrace();\n" +" }\n" +" return Action.CommitMessage;\n" +" }\n" +" \n" +"}";private final String CONSUMER_CODE ="import com.aliyun.openservices.ons.api.MessageListener;\n" +"import com.aliyun.openservices.ons.api.PropertyKeyConst;\n" +"import com.aliyun.openservices.ons.api.bean.ConsumerBean;\n" +"import com.aliyun.openservices.ons.api.bean.Subscription;\n" +"import com.afanti.datastreamline.utils.SpringUtils;\n" +"import java.util.HashMap;\n" +"import java.util.Map;\n" +"import java.util.Properties;\n" +"import com.afanti.datastreamline.config.MqProperties;\n" +"\n" +"public class AfantiDouyinDataConsumer extends ConsumerBean {\n" +"\n" +" public void initConsumer() {\n" +" //配置文件\n" +" MqProperties mqConfig = (MqProperties) SpringUtils.getBean(\"mqProperties\");\n" +" Properties properties = mqConfig.getMqPropertie();\n" +" // System.out.println(mqConfig.print());\n" +" properties.setProperty(PropertyKeyConst.GROUP_ID, \"GID_AFANTI_CHIN_SURVEY\");\n" +" //将消费者线程数固定为20个 20为默认值\n" +" properties.setProperty(PropertyKeyConst.ConsumeThreadNums, \"15\");\n" +" properties.setProperty(PropertyKeyConst.MaxCachedMessageAmount,\"1000\");\n" +" this.setProperties(properties);\n" +" //订阅关系\n" +" Map<Subscription, MessageListener> subscriptionTable = new HashMap<Subscription, MessageListener>();\n" +" Subscription subscription = new Subscription();\n" +" subscription.setTopic(\"AFANTI_CHIN_SURVEY\");\n" +" MessageListener afantiDouyinDataMessageListener = (MessageListener) SpringUtils.getBean(\"afantiDouyinDataMessageListener\");\n" +" subscriptionTable.put(subscription, afantiDouyinDataMessageListener);\n" +" //订阅多个topic如上面设置\n" +" this.setSubscriptionTable(subscriptionTable);\n" +" this.start();\n" +" }\n" +"}";private final String KAFKA_TEST = "import lombok.extern.slf4j.Slf4j;\n" +"import org.apache.kafka.clients.consumer.ConsumerRecord;\n" +"import org.springframework.beans.factory.annotation.Autowired;\n" +"import org.springframework.kafka.annotation.KafkaListener;\n" +"import org.springframework.stereotype.Component;\n" +"import org.springframework.kafka.support.Acknowledgment;\n\n" +"import java.util.List;\n" +"\n" +"/**\n" +" * @author Data\n" +" */\n" +"@Slf4j\n" +"@Component\n" +"public class AfantiMessageListener {\n" +"\n" +" @KafkaListener(\n" +" topics = \"AFANTI_CHIN_DEV\",\n" +" containerFactory = \"kafkaListenerContainerFactory\",\n" +" groupId = \"chin-test\")\n" +" public void kafkaListener(List<ConsumerRecord<String, String>> messages, Acknowledgment ack) throws Exception {\n" +" System.out.println(messages.get(0));\n" +" ack.acknowledge();\n" +" }\n" +"\n" +"}";private final String COLUMNS = "{\"afanti_douyin_aweme_info\":{\"aweme_id\":{\"type\":\"String\",\"path\":\"item_id\"},\"aweme_title\":{\"type\":\"String\",\"path\":\"title\"},\"cover\":{\"type\":\"String\",\"path\":\"aweme_cover\"},\"ctime\":{\"type\":\"Date\",\"path\":\"c\"}}}";@Overridepublic void run(String... args) {try {compilerAndRegister(DtoFreemarkerUtil.buildCode(COLUMNS), "AfantiDouyinAwemeInfo", null, true);log.info("DTO、Entity代码构建、装载和初始化完成...");compilerAndRegister(LISTENER_CODE, "AfantiDouyinDataMessageListener", null, false);log.info("Listener代码构建、装载完成...");compilerAndRegister(CONSUMER_CODE, "AfantiDouyinDataConsumer", "initConsumer", false);log.info("RocketMq Consumer代码构建、装载完成...");log.info("RocketMq Consumer初始化完成...");compilerAndRegister(KAFKA_TEST, "AfantiMessageListener", null, false);} catch (URISyntaxException | TemplateException | IOException e) {e.printStackTrace();}}@Datastatic public class Mq {private String gid;private String topic;private String platform;}private void compilerAndRegister(String code, String clazzName, String initMethod, boolean needObject) throws URISyntaxException {JavaCompiler compiler = ToolProvider.getSystemJavaCompiler();StandardJavaFileManager standardFileManager = compiler.getStandardFileManager(null, null, null);ClassJavaFileManager classJavaFileManager = new ClassJavaFileManager(standardFileManager);StringObject stringObject = new StringObject(new URI( clazzName + ".java"), JavaFileObject.Kind.SOURCE, code);// 加入编译任务队列JavaCompiler.CompilationTask task = compiler.getTask(null, classJavaFileManager, null, null, null,Collections.singletonList(stringObject));Class clazz = null;Object entityObj = null;if (task.call()) {ClassJavaFileObject javaFileObject = classJavaFileManager.getClassJavaFileObject();// 获取AppClassloader加载器ClassLoader classLoader = new MyClassLoader(javaFileObject);try {clazz = classLoader.loadClass(clazzName);if (needObject) {entityObj = clazz.newInstance();}} catch (ClassNotFoundException | IllegalAccessException | InstantiationException e) {e.printStackTrace();}}ApplicationContext ctx = SpringUtils.getApplicationContext();// Spring Ioc Bean工厂DefaultListableBeanFactory defaultListableBeanFactory = (DefaultListableBeanFactory) ctx.getAutowireCapableBeanFactory();assert clazz != null;// Bean属性等定义器BeanDefinitionBuilder beanDefinitionBuilder;if (needObject) {assert entityObj != null;beanDefinitionBuilder = BeanDefinitionBuilder.genericBeanDefinition(entityObj.getClass());} else {beanDefinitionBuilder = BeanDefinitionBuilder.genericBeanDefinition(clazz);}// 设置初始化配置 RocketMQ Consumerif (initMethod != null) {beanDefinitionBuilder.setInitMethodName(initMethod);beanDefinitionBuilder.setDestroyMethodName("shutdown");}beanDefinitionBuilder.setLazyInit(false);String beanName = clazzName.substring(0,1).toLowerCase(Locale.ROOT) + clazzName.substring(1);defaultListableBeanFactory.registerBeanDefinition(beanName, beanDefinitionBuilder.getBeanDefinition());ctx.getBean(beanName);}static class ClassJavaFileManager extends ForwardingJavaFileManager {private ClassJavaFileObject classJavaFileObject;public ClassJavaFileManager(JavaFileManager fileManager) {super(fileManager);}public ClassJavaFileObject getClassJavaFileObject() {return classJavaFileObject;}/**读取Class文件字节流*/@Overridepublic JavaFileObject getJavaFileForOutput(JavaFileManager.Location location, String className,JavaFileObject.Kind kind, FileObject sibling) {return (classJavaFileObject = new ClassJavaFileObject(className,kind));}}/**存储源文件*/static class StringObject extends SimpleJavaFileObject {private final String content;public StringObject(URI uri, Kind kind, String content) {super(uri, kind);this.content = content;}//使JavaCompiler可以从content获取java源码@Overridepublic CharSequence getCharContent(boolean ignoreEncodingErrors) {return this.content;}}/**class文件(不需要存到文件中)*/static class ClassJavaFileObject extends SimpleJavaFileObject {ByteArrayOutputStream outputStream;public ClassJavaFileObject(String className, Kind kind) {super(URI.create(className + kind.extension), kind);this.outputStream = new ByteArrayOutputStream();}@Overridepublic OutputStream openOutputStream() {return this.outputStream;}//获取输出流为byte[]数组public byte[] getBytes(){return this.outputStream.toByteArray();}}/**自定义classloader*/static class MyClassLoader extends ClassLoader {private final ClassJavaFileObject stringObject;public MyClassLoader(ClassJavaFileObject stringObject){this.stringObject = stringObject;}@Overrideprotected Class<?> findClass(String name) {byte[] bytes = this.stringObject.getBytes();return defineClass(name,bytes,0,bytes.length);}}}
以上代码参数配置以常量显示,线上环境跟Demo有点差别,下次会展示最终的线上代码,正在修修补补中…
PS.参考文档已忘…