Netty(12)自定义RPC框架的通信

news/2024/5/20 0:42:54/文章来源:https://blog.csdn.net/yyuggjggg/article/details/126616346

在之前的聊天项目中,新增 Rpc 请求和响应消息

消息

新增消息

@Data
public abstract class Message implements Serializable {// 省略旧的代码public static final int RPC_MESSAGE_TYPE_REQUEST = 101;public static final int  RPC_MESSAGE_TYPE_RESPONSE = 102;static {// ...messageClasses.put(RPC_MESSAGE_TYPE_REQUEST, RpcRequestMessage.class);messageClasses.put(RPC_MESSAGE_TYPE_RESPONSE, RpcResponseMessage.class);}}

请求消息


@Getter
@ToString(callSuper = true)
public class RpcRequestMessage extends Message {/*** 调用的接口全限定名,服务端根据它找到实现,清楚远程调用哪一个接口*/private String interfaceName;/*** 调用接口中的方法名:远程调用的方法名*/private String methodName;/*** 方法返回类型:远程调用的方法的返回值类型*/private Class<?> returnType;/*** 方法参数类型数组:远程调用的方法的参数类型*/private Class[] parameterTypes;/*** 方法参数值数组:远程调用的方法的实际参数的值*/private Object[] parameterValue;//构造方法:对上面的参数进行赋值public RpcRequestMessage(int sequenceId, String interfaceName, String methodName, Class<?> returnType, Class[] parameterTypes, Object[] parameterValue) {super.setSequenceId(sequenceId);this.interfaceName = interfaceName;this.methodName = methodName;this.returnType = returnType;this.parameterTypes = parameterTypes;this.parameterValue = parameterValue;}@Overridepublic int getMessageType() {return RPC_MESSAGE_TYPE_REQUEST;}
}

响应消息

@Data
@ToString(callSuper = true)
public class RpcResponseMessage extends Message {/*** 返回值*/private Object returnValue;/*** 异常值:远程调用如果出现了错误,便使用这个异常值进行获取*/private Exception exceptionValue;@Overridepublic int getMessageType() {return RPC_MESSAGE_TYPE_RESPONSE;}
}

根据接口名称找到实现

import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;public class ServicesFactory {static Properties properties;static Map<Class<?>, Object> map = new ConcurrentHashMap<>();static {try (InputStream in = Config.class.getResourceAsStream("/application.properties")) {properties = new Properties();properties.load(in);Set<String> names = properties.stringPropertyNames();for (String name : names) {if (name.endsWith("Service")) {Class<?> interfaceClass = Class.forName(name);Class<?> instanceClass = Class.forName(properties.getProperty(name));map.put(interfaceClass, instanceClass.newInstance());}}} catch (IOException | ClassNotFoundException | InstantiationException | IllegalAccessException e) {throw new ExceptionInInitializerError(e);}}public static <T> T getService(Class<T> interfaceClass) {return (T) map.get(interfaceClass);}
}

service方法

package cn.abc.server.service;public interface HelloService {String sayHello(String name);
}
package cn.abc.server.service;public class HelloServiceImpl implements HelloService {@Overridepublic String sayHello(String msg) {return "你好, " + msg;}
}

application.properties

serializer.algorithm=Json
cn.abc.server.service.HelloService=cn.abc.server.service.HelloServiceImpl

序列化

将java的class转换为json字符串

public interface Serializer {// 反序列化方法<T> T deserialize(Class<T> clazz, byte[] bytes);// 序列化方法<T> byte[] serialize(T object);enum Algorithm implements Serializer {Java {@Overridepublic <T> T deserialize(Class<T> clazz, byte[] bytes) {try {ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));return (T) ois.readObject();} catch (IOException | ClassNotFoundException e) {throw new RuntimeException("反序列化失败", e);}}@Overridepublic <T> byte[] serialize(T object) {try {ByteArrayOutputStream bos = new ByteArrayOutputStream();ObjectOutputStream oos = new ObjectOutputStream(bos);oos.writeObject(object);return bos.toByteArray();} catch (IOException e) {throw new RuntimeException("序列化失败", e);}}},Json {@Overridepublic <T> T deserialize(Class<T> clazz, byte[] bytes) {//注册的时候说明是哪一种类型需要这种转换器//现在是Class类型需要这个转换器Gson gson = new GsonBuilder().registerTypeAdapter(Class.class, new Serializer.ClassCodec()).create();String json = new String(bytes, StandardCharsets.UTF_8);return gson.fromJson(json, clazz);}@Overridepublic <T> byte[] serialize(T object) {//注册的时候说明是哪一种类型需要这种转换器//现在是Class类型需要这个转换器Gson gson = new GsonBuilder().registerTypeAdapter(Class.class, new Serializer.ClassCodec()).create();String json = gson.toJson(object);return json.getBytes(StandardCharsets.UTF_8);}}}//将java的哪一个类型(class)转换为jsonclass ClassCodec implements JsonSerializer<Class<?>>, JsonDeserializer<Class<?>> {@Overridepublic Class<?> deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context) throws JsonParseException {try {//拿到字符串形式:类名String str = json.getAsString();//将类名还原为java中的Classreturn Class.forName(str);} catch (ClassNotFoundException e) {throw new JsonParseException(e);}}//将java中的Class变为json@Override            public JsonElement serialize(Class<?> src, Type typeOfSrc, JsonSerializationContext context) {// class -> jsonreturn new JsonPrimitive(src.getName());}}
}

SequenceId

import java.util.concurrent.atomic.AtomicInteger;public abstract class SequenceIdGenerator {private static final AtomicInteger id = new AtomicInteger();public static int nextId() {return id.incrementAndGet();}
}

第一版本(客户端与服务器网络通信)

服务器端

@Slf4j
public class RpcServer {public static void main(String[] args) {NioEventLoopGroup boss = new NioEventLoopGroup();NioEventLoopGroup worker = new NioEventLoopGroup();LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();// rpc 请求消息处理器RpcRequestMessageHandler RPC_HANDLER = new RpcRequestMessageHandler();try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.group(boss, worker);serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {//处理半包黏包的处理器ch.pipeline().addLast(new ProcotolFrameDecoder());//日志handler的处理器ch.pipeline().addLast(LOGGING_HANDLER);//自定义协议的编解码器ch.pipeline().addLast(MESSAGE_CODEC);//rpc 请求消息处理器ch.pipeline().addLast(RPC_HANDLER);}});Channel channel = serverBootstrap.bind(8080).sync().channel();channel.closeFuture().sync();} catch (InterruptedException e) {log.error("server error", e);} finally {boss.shutdownGracefully();worker.shutdownGracefully();}}
}

服务器端Handler(请求)

@Slf4j
@ChannelHandler.Sharable
public class RpcRequestMessageHandler extends SimpleChannelInboundHandler<RpcRequestMessage> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, RpcRequestMessage message) {RpcResponseMessage response = new RpcResponseMessage();response.setSequenceId(message.getSequenceId());try {//通过接口名称获取对应的实现类对象HelloService service = (HelloService)ServicesFactory.getService(Class.forName(message.getInterfaceName()));//通过实现类对象的方法名找到对应的方法Method method = service.getClass().getMethod(message.getMethodName(), message.getParameterTypes());//获取执行结果Object invoke = method.invoke(service, message.getParameterValue());response.setReturnValue(invoke);} catch (Exception e) {e.printStackTrace();String msg = e.getCause().getMessage();response.setExceptionValue(new Exception("远程调用出错:" + msg));}ctx.writeAndFlush(response);}}

客户端

public class RpcClient {public static void main(String[] args) {NioEventLoopGroup group = new NioEventLoopGroup();LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();// rpc 响应消息处理器RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler();try {Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(group);bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {//处理半包黏包的处理器ch.pipeline().addLast(new ProcotolFrameDecoder());//日志handler的处理器ch.pipeline().addLast(LOGGING_HANDLER);//自定义协议的编解码器ch.pipeline().addLast(MESSAGE_CODEC);// rpc 响应消息处理器ch.pipeline().addLast(RPC_HANDLER);}});Channel channel = bootstrap.connect("localhost", 8080).sync().channel();//发送请求消息,开始运行出站处理器(MESSAGE_CODEC、LOGGING_HANDLER)//然后进入服务器端ChannelFuture future = channel.writeAndFlush(new RpcRequestMessage(1,"cn.abc.server.service.HelloService","sayHello",String.class,new Class[]{String.class},new Object[]{"张三"})).addListener(f-> {if (!f.isSuccess()) {//获取异常信息Throwable cause = f.cause();log.error("error", cause);}});channel.closeFuture().sync();} catch (Exception e) {log.error("client error", e);} finally {group.shutdownGracefully();}}
}

客户端Handler(响应)

@Slf4j
@ChannelHandler.Sharable
public class RpcResponseMessageHandler extends SimpleChannelInboundHandler<RpcResponseMessage> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, RpcResponseMessage msg) throws Exception {log.debug("{}", msg);}
}

流程说明

  1. 客户端建立连接

    Channel channel = bootstrap.connect("localhost", 8080).sync().channel()
    
  2. 客户端这边发送一个请求消息

    ChannelFuture future = channel.writeAndFlush(new RpcRequestMessage(1,"cn.abc.server.service.HelloService","sayHello",String.class,new Class[]{String.class},new Object[]{"张三"})).addListener(f-> {if (!f.isSuccess()) {//如果没有成功//获取异常信息Throwable cause = f.cause();log.error("error", cause);}});
    
  3. 客户端的请求消息找到客户端的pipeline的出站处理器(现在客户端只有两个出站处理器)

    ch.pipeline().addLast(LOGGING_HANDLER);
    ch.pipeline().addLast(MESSAGE_CODEC);
  4. 客户端的出站处理器,处理完后,进入服务器端pipeline的入站处理器,对消息进行处理

    ch.pipeline().addLast(new ProcotolFrameDecoder());
    ch.pipeline().addLast(LOGGING_HANDLER);
    ch.pipeline().addLast(MESSAGE_CODEC);
    //将消息交给了请求handler
    ch.pipeline().addLast(RPC_HANDLER);
  5. 服务器端的入站处理器,进入请求handler:该handler拿到rpc的请求消息,通过接口名称得到要正的要调用的实现对象,找到要调用的方法,过后反射进行调用获取到执行的结果,将这个过程中成功或异常信息封装到响应消息里,通过ctx返回

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RpcRequestMessage message) {RpcResponseMessage response = new RpcResponseMessage();response.setSequenceId(message.getSequenceId());try {//通过接口名称获取对应的实现类对象HelloService service = (HelloService)ServicesFactory.getService(Class.forName(message.getInterfaceName()));//通过实现类对象的方法名找到对应的方法Method method = service.getClass().getMethod(message.getMethodName(), message.getParameterTypes());//获取执行结果Object invoke = method.invoke(service, message.getParameterValue());response.setReturnValue(invoke);} catch (Exception e) {e.printStackTrace();String msg = e.getCause().getMessage();response.setExceptionValue(new Exception("远程调用出错:" + msg));}ctx.writeAndFlush(response);
    }
  6. ctx返回的响应消息,要经过服务器端的出站处理器:从上往下找到出站处理器,当前服务器端的两次出站处理器处理完后,便将消息发送到了客户端

    ch.pipeline().addLast(LOGGING_HANDLER);
    ch.pipeline().addLast(MESSAGE_CODEC);
    
  7. 客户端这边收到消息后,做入站处理,当前客户端要结果下面的四次入站处理,到第四次交给客户端的handler

    ch.pipeline().addLast(new ProcotolFrameDecoder());
    ch.pipeline().addLast(LOGGING_HANDLER);
    ch.pipeline().addLast(MESSAGE_CODEC);
    //将消息交给了请求handler
    ch.pipeline().addLast(RPC_HANDLER);
  8. 客户端的handler,接收到消息后,将消息进行了打印

    
    @Slf4j
    @ChannelHandler.Sharable
    public class RpcResponseMessageHandler extends SimpleChannelInboundHandler<RpcResponseMessage> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, RpcResponseMessage msg) throws Exception {log.debug("{}", msg);}
    }
    

第二版本

第一版本是将消息写死了,

ChannelFuture future = channel.writeAndFlush(new RpcRequestMessage(1,"cn.abc.server.service.HelloService","sayHello",String.class,new Class[]{String.class},new Object[]{"张三"})).addListener(promise -> {if (!promise.isSuccess()) {Throwable cause = promise.cause();log.error("error", cause);}});

第一版本中,方法名、方法参数、类型等,是自己去做一些转换封装成了消息对象,带来了使用者的一些工作量

new RpcRequestMessage(1,"cn.abc.server.service.HelloService","sayHello",String.class,new Class[]{String.class},new Object[]{"张三"})

下面第二版本的任务便是将客户端的代码进行改造,其他内容不变

客户端

 
import cn.abc.client.handler.RpcResponseMessageHandler;
import cn.abc.message.RpcRequestMessage;
import cn.abc.protocol.MessageCodecSharable;
import cn.abc.protocol.ProcotolFrameDecoder;
import cn.abc.protocol.SequenceIdGenerator;
import cn.abc.server.service.HelloService;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.concurrent.DefaultPromise;
import lombok.extern.slf4j.Slf4j;import java.lang.reflect.Proxy;@Slf4j
public class RpcClientManager {public static void main(String[] args) {HelloService service = getProxyService(HelloService.class);System.out.println(service.sayHello("zhangsan"));}/* 创建代理类:将service.sayHello("zhangsan")这个方法的调用再转换为rpc消息(RpcRequestMessage);再由当前这个代理类去发送这条消息*/public static <T> T getProxyService(Class<T> serviceClass) {//拿到接口类型的类加载器ClassLoader loader = serviceClass.getClassLoader();//代理要实现的接口的数组Class<?>[] interfaces = new Class[]{serviceClass};// sayHello  "张三"//创建代理类Object o = Proxy.newProxyInstance(loader, interfaces, (proxy, method, args) -> {/*proxy:代理对象method:代理类正在执行的方法args:方法的实际参数数组*/int sequenceId = SequenceIdGenerator.nextId();// 1. 将方法调用转换为 消息对象RpcRequestMessage msg = new RpcRequestMessage(sequenceId,serviceClass.getName(),method.getName(),method.getReturnType(),method.getParameterTypes(),args);// 2. 将消息对象发送出去getChannel().writeAndFlush(msg);// 3. 暂时返回nullreturn null;          });return (T) o;}private static Channel channel = null;private static final Object LOCK = new Object();// 获取唯一的 channel 对象public static Channel getChannel() {if (channel != null) {return channel;}/*如果有两个线程同时进入这个方法,才开始时,二者都判断出当前channel为null;过后,两个线程(t1、t2)同时进入下面的synchronized;t1线程首先抢到锁,往下运行,如果此时是channel确实是null,则触发init方法,返回channel对象;t1线程执行完后,t2线程开始运行,首先还是需要判断当前channel是否为null,现在不为null了,则直接返回channel;*///对线程加锁synchronized (LOCK) { //  t2if (channel != null) { // t1return channel;}//当channel为空,则创建channelinitChannel();return channel;}}// 初始化 channel 方法private static void initChannel() {NioEventLoopGroup group = new NioEventLoopGroup();LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler();Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(group);bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new ProcotolFrameDecoder());ch.pipeline().addLast(LOGGING_HANDLER);ch.pipeline().addLast(MESSAGE_CODEC);ch.pipeline().addLast(RPC_HANDLER);}});try {channel = bootstrap.connect("localhost", 8080).sync().channel();//不在使用sync方法在这里阻塞,所以使用异步方式,当channel关闭时触发channel.closeFuture().addListener(future -> {group.shutdownGracefully();});} catch (Exception e) {log.error("client error", e);}}
}

流程说明

  1. 在客户端创建出来一个代理对象,代理对象将方法调用转换为消息发送
HelloService service = getProxyService(HelloService.class);
  1. 当调用代理对象中的任何方法时,

    service.sayHello("zhangsan")
    
  2. 当上面的方法进行调用时,便会进入代理对象的下面代码:zhangsan将会传递为args,开始序列化生成rpc的消息进行发送

    (proxy, method, args) -> {/*proxy:代理对象method:代理类正在执行的方法args:方法的实际参数数组*/int sequenceId = SequenceIdGenerator.nextId();// 1. 将方法调用转换为 消息对象RpcRequestMessage msg = new RpcRequestMessage(sequenceId,serviceClass.getName(),method.getName(),method.getReturnType(),method.getParameterTypes(),args);// 2. 将消息对象发送出去getChannel().writeAndFlush(msg);// 3. 暂时返回nullreturn null;          
    }
    

第三版本

第二版本的客户端代码并没有去接收结果,现在将在第三版本中去接收结果。

第二版本中创建的代理对象内容,都是在主线程中去调用运行的,但是服务器真正将响应消息返回是在:

  • 客户端的RpcResponseMessageHandler里面的channelRead0中获取到并打印的,
  • 这是在netty的Nio的线程里面执行的

现在就是在两个线程中运行,两个线程中通信可以使用Promise

用图说明过程

在这里插入图片描述

  1. t0、t2、t4:主线程的三次方法调用
  2. t1、t3、t5:Nio得到响应消息的线程
  3. 这两个线程之间使用Promise(多个)去交换数据
  4. t0开始调用时,需要准备好一个promise;将来t1拿到结果了,就将结果填到当前t0准备好的这个promise里面
  5. t2和t3;t4和t5类似
  6. 这些promise需要有一个集合去管理,promise之间需要对应起来,这时候就需要一个id(在代码里面是SequenceId)去对应起来

下面还是只是需要修改客户端的代码

客户端

@Slf4j
public class RpcClientManager {//...其他代码不变,这里只是将变化的代码进行说明public static <T> T getProxyService(Class<T> serviceClass) {//拿到接口类型的类加载器ClassLoader loader = serviceClass.getClassLoader();//代理要实现的接口的数组Class<?>[] interfaces = new Class[]{serviceClass};// sayHello  "张三"//创建代理类Object o = Proxy.newProxyInstance(loader, interfaces, (proxy, method, args) -> {/*proxy:代理对象method:代理类正在执行的方法args:方法的实际参数数组*/int sequenceId = SequenceIdGenerator.nextId();// 1. 将方法调用转换为 消息对象RpcRequestMessage msg = new RpcRequestMessage(sequenceId,serviceClass.getName(),method.getName(),method.getReturnType(),method.getParameterTypes(),args);// 2. 将消息对象发送出去getChannel().writeAndFlush(msg);// 3. 准备一个空 Promise 对象,来接收结果// getChannel().eventLoop():指定 promise 对象异步接收结果线程DefaultPromise<Object> promise = new DefaultPromise<>(getChannel().eventLoop());//将id(sequenceId) 和 promise 进行对应RpcResponseMessageHandler.PROMISES.put(sequenceId, promise);
//            promise.addListener(future -> {//getChannel().eventLoop():指定 promise 对象异步接收结果线程//当前这个Listener方法里面与上面的getChannel().eventLoop()将会是同一个线程
//                // 线程
//            });// 4. 等待 promise 结果promise.await();//直到有消息:// 即RpcResponseMessageHandler里面调用了 setFailure() 或 setSuccess()方法promise.await()便会结束等待if(promise.isSuccess()) {// 调用正常:返回promise存放的东西(在RpcResponseMessageHandler里面存放的)return promise.getNow();} else {// 调用失败throw new RuntimeException(promise.cause());}});return (T) o;}//....其他代码不变,这里只是将变化的代码进行说明

客户端Handler

import cn.abc.message.RpcResponseMessage;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.concurrent.Promise;
import lombok.extern.slf4j.Slf4j;import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;@Slf4j
@ChannelHandler.Sharable
public class RpcResponseMessageHandler extends SimpleChannelInboundHandler<RpcResponseMessage> {/*使用map集合去存放所有的promise,ConcurrentHashMap线程安全Integer:        序号Promise<Object>:用来接收结果(结果不确定,Object去接收)的 promise 对象*/public static final Map<Integer, Promise<Object>> PROMISES = new ConcurrentHashMap<>();@Overrideprotected void channelRead0(ChannelHandlerContext ctx, RpcResponseMessage msg) throws Exception {log.debug("{}", msg);// 拿到空的 promise:根据消息序号拿到还没有填充结果的promise//remove:拿到移除后的promise,传递完消息后,这个promise就没有用了,所以使用removePromise<Object> promise = PROMISES.remove(msg.getSequenceId());//如果promise为空,则判断结果是长成还是异常if (promise != null) {//正常结果Object returnValue = msg.getReturnValue();//异常结果Exception exceptionValue = msg.getExceptionValue();//setFailure 和 setSuccess 不管调用哪一个,都会使promise.await()方法结束等待if(exceptionValue != null) {promise.setFailure(exceptionValue);} else {promise.setSuccess(returnValue);}}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_381467.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

springboot系列(二十一):基于AOP实现自定义注解且记录接口日志|超级超级详细,建议收藏

&#x1f468;‍&#x1f393;作者&#xff1a;bug菌 &#x1f6ab;特别声明&#xff1a;原创不易&#xff0c;转载请附上原文出处链接和本文声明&#xff0c;谢谢配合。 &#x1f64f;版权声明&#xff1a;文章里可能部分文字或者图片来源于互联网或者百度百科&#xff0c;如有…

Scss--将node-sass切换为sass(原dart-sass)

原文网址&#xff1a;Scss--将node-sass切换为sass(原dart-sass)_IT利刃出鞘的博客-CSDN博客 简介 说明 本文介绍node-sass与sass&#xff08;原dart-sass&#xff09;&#xff0c;以及如何将node-sass切换为sass&#xff08;原dart-sass&#xff09;。 sass依赖原来是dart-s…

Alibaba之jvm-sandbox初体验

前言 在开始之前&#xff0c;我们先来模拟一下以下的场景&#xff1a; 小李&#xff1a;“小明&#xff0c;你的接口没有返回数据&#xff0c;麻烦帮忙看一下&#xff1f;” 小明&#xff1a;“我这边的数据也是从别人的服务器中拿到的&#xff0c;但是我不确定是因为逻辑处理…

alluxio简单使用

alluxio简单使用 本文是基于alluxio官网和自己实践整理。 Alluxio版本&#xff1a;1.8.1CDH 1.15.2 1、介绍 以内存为中心的分布式虚拟存储系统。Alluxio在上层计算框架和底层存储系统之间架起了桥梁&#xff0c;应用层只需要访问Alluxio即可以访问底层对接了的任意存储系统的…

Oracle索引详解

索引 类似于书的目录&#xff0c;提高查询效率。 创建索引语法&#xff1a; CREATE [UNIQUE] [BITMAP] INDEX 索引名称 ON 表名(字段,[字段,..,..]);名词解释&#xff1a;UNIQUE 唯一索引BITMAP 位图索引默认不写 UNIQUE 和 BITMAP 为普通索引表名后面写多个字段为复合索引在字…

activeMQ、rabbitMQ学习对比心得

一、activemq activemq工作模型比较简单。只有两种模式 queue、topics 。 queue就多对一&#xff0c;producer往queue里发送消息&#xff0c;消费者从queue里取&#xff0c;消费一条&#xff0c;就从queue里移除一条。如果一个消费者消费速度不够快怎么办呢&#xff1f;在act…

About-Flink

About-Flink 一、Flink简介 1.1、flink特点1.2、分层Api1.3、Flink vs Spark Streaming 二、Flink批处理应用 2.1、依赖的引入2.2、准备批处理文件2.3、wordCount编码2.4、自定义类 三、Flink流处理应用 3.1、wordCount编码3.2、设置并行度-默认为43.2、数据来源socket3.3、配…

通过 replace() 和正则实现 将文本中的所有数字颜色高亮

实现的效果&#xff1a; 用到的知识点&#xff1a; replace() 方法用于在字符串中用一些字符替换另一些字符&#xff0c;或替换一个与正则表达式匹配的子串。 repalce&#xff08; a, b &#xff09; 必须传两个值&#xff0c;其中a 是要替换的文本&#xff0c;或者满足条件…

javaweb JAVA JSP球鞋销售系统购物系统ssm购物系统购物商城系统源码(ssm电子商务系统)

JSP球鞋销售系统购物系统ssm购物系统购物商城系统源码&#xff08;ssm电子商务系统&#xff09;

生产和同城存储双活架构下,发生脑裂问题影响数据库读写,如何快速分析问题和解决问题?

数据中心脑裂问题,简单说就是两个数据中心间的网络和存储链路同时发生中断,导致两个数据中心内的应用、数据库或者操作系统同时抢占和利用共享的资源,造成资源的数据不一致,产生重大影响。如何避免脑裂是每个存储双活方案都需要尤为重视的问题,脑裂会带来长时间的存储读写…

linux上redis单机的安装

1. 官网下载 https://github.com/redis/redis/archive/7.0.4.tar.gz 2. 上传到虚拟机/data/目录下、解压 tar -xzvf redis-7.0.4.tar.gz 3. 进入redis-7.0.4此目录 cd redis-7.0.4;ll 4. 安装到指定目录中 a. mkdir /usr/local/redis b. make PREFIX/usr/local/redis inst…

沃尔玛、eBay、wish、新蛋等美系平台对于测评风控点有哪些?怎么解决

很多人把各大平台风控想得过于简单&#xff0c;以为注册一批买家账号配一个IP就能进行下单上评&#xff0c;这也是导致市面上的测评现象杂乱无章。但是一定要明白一点各大电商平台都是一家数据公司他的算法一定是根据市场的变化而不断调整的。 平台检测的方式有很多种 1、平台…

RabbitMQ入门(二)

1.概述 RabbityMQ整体上是一个生产者和消费者模式。生产者生产消息到消息中间件的服务节点&#xff08;Broker&#xff09;,服务节点中包含交换器&#xff08;Exchange&#xff09;和队列&#xff08;Queue&#xff09;&#xff0c;生产的消息首先经过交换器&#xff0c;再由交…

搭建vue3项目

搭建vue3项目搭建准备创建项目选择所需配置运行项目vue3已经被大众所熟悉&#xff0c;很多公司都在做vue2到vue3的升级。 介绍vue3项目的搭建过程 搭建准备 前端开发环境需要node.js&npm node下载地址:http://nodejs.cn/download/ 根据自己电脑环境下载就行 安装vue-cli3…

2022/08/31 day14:企业级解决方案

文章目录目录缓存预热缓存雪崩缓存击穿缓存穿透性能指标监控总结目录 面试问题 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-EtBtkGNE-1661933471760)(en-resource://database/5507:1)] [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下…

抖音小程序模板全行业整理合集,抖音小程序制作平台分享

小弟我是来自第三方抖音小程序制作平台的打工人&#xff0c;给大家整合了一些我们平台的抖音小程序模板&#xff0c;大家可以根据需要来获取。 步骤就是点击下方的链接&#xff0c;选好自己的抖音小程序模板&#xff0c;在平台注册账号直接套用到自己的抖音小程序上&#xff0…

深入理解蓝牙BLE之“信道管理”

目录 一.BLE的调制解调&#xff1a; 二.BLE的信道&#xff1a; 三.BLE的广播信道&#xff1a; 四.BLE的数据信道&#xff1a; 五.BLE信道管理&#xff1a; 5.1广播信道的随机延时&#xff1a; 5.2数据信道的调频算法&#xff1a; 跳频算法1&#xff1a; 跳频算法2&…

02.Haoop 虚拟机 桥接与NAT之间区别 及桥接设置

首先说 我的硬件准备&#xff0c;1台windows系统&#xff0c;1台mac pro 。 在 物理机上使用了 VMWARE CENTOS 7 的 方式进行配置。 那么我希望能实现把 这2台机器连在一起&#xff0c;做Hadoop 的集群。 网络问题是首先需要解决的事情&#xff0c;主要不通物理主机之间一直…

02:入门及安装(狂神说RabbitMQ)

RabbitMQ入门及安装 https://www.bilibili.com/video/BV1dX4y1V73Gp27 概述 简单概述&#xff1a; RabbitMQ是一个开源的遵循 AMQP协议实现的基于 Erlang语言编写&#xff0c;支持多种客户端&#xff08;语言&#xff09;&#xff0c;用于在分布式系统中存储消息&#xff0…

Spring Security 入门之自定义表单登录开发实现(三)

文章目录1. 前言2. 自定义认证2.1 自定义登录页面2.2 后端认证逻辑3. 自定义登陆成功处理3.1 登陆成功原理3.2 自定义登陆成功响应处理4. 自定义登陆失败处理4.1 登陆失败原理4.2 自定义登陆失败响应处理5. 注销用户处理5.1 注销原理总结1. 前言 在弄懂HelloWorld案例后&#…