RPC gRPC

RPC 远程服务调用

主要内容

  1. 项目结构变化
  2. RPC简介
  3. RMI实现RPC
  4. HttpClient实现RPC
  5. Zookeeper安装
  6. Zookeeper客户端常用命令
  7. 向Zookeeper中注册内容
  8. 从Zookeeper中发现内容
  9. 手写RPC框架

二、项目架构变化

1     单体架构

1.1   架构图

单体架构就是一个项目里面包含这个项目中全部代码。一个应用搞定全部功能。

DNS 服务器可以是单映射,也可以配置多个映射。

1.2   软件代码结构

在单体架构项目中,团队都是通过包(package)进行区分每个模块。

总体包结构:com.msb.*.分层包。

项目名:
  -- com
    --msb
      -- common
        -- utils
      --user
        -- controller
        -- service
        -- mapper
      -- sys
        -- controller
        -- service
        -- mapper

1.3   优缺点

1.3.1    优点

部署简单

维护方便

成本低

1.3.2    缺点

当项目规模大、用户访问频率高、并发量大、数据量大时,会大大降低程序执行效率,甚至出现服务器宕机等情况。

1.4   适用项目

传统管理项目,小型互联网项目。

2     分布式架构

2.1   架构图(简易版)

分布式架构会把一个项目按照特定要求(多按照模块或功能)拆分成多个项目,每个项目分别部署到不同的服务器上。

2.2   软件代码结构
项目1:
  --com.msb.xxx
    -- controller
    -- service
    -- mapper
项目2
  --com.msb.mmm
    -- controller
    -- service
    -- mapper

2.3   优缺点

2.3.1    优点

增大了系统可用性。减少单点故障,导致整个应用不可用。

增加重用性。因为模块化,所以重用性更高。

增加可扩展性。有新的模块增加新的项目即可。

增加每个模块的负载能力。因为每个模块都是一个项目,所以每个模块的负载能力更强。

2.3.2    缺点

成本更高。

架构更加复杂。

整体响应之间变长,一些业务需要多项目通信后给出结果。

吞吐量更大。吞吐量= 请求数/秒。

2.4   待解决问题

分布式架构中各个模块如何进行通信?

可以使用Http协议,也可以使用RPC协议通信,也可以使用其他的通信方式。我们本阶段使用的是RPC协议,因为它比HTTP更适合项目内部通信。

三、RPC简介

1     RFC

RFC(Request For Comments) 是由互联网工程任务组(IETF)发布的文件集。文件集中每个文件都有自己唯一编号,例如:rfc1831。目前RFC文件由互联网协会(Internet Society,ISOC)赞助发型。

RPC就收集到了rfc 1831中。可以通过下面网址查看:

https://datatracker.ietf.org/doc/rfc1831/

2     RPC

RPC在rfc 1831中收录 ,RPC(Remote Procedure Call) 远程过程调用协议

RPC协议规定允许互联网中一台主机程序调用另一台主机程序,而程序员无需对这个交互过程进行编程。在RPC协议中强调当A程序调用B程序中功能或方法时,A是不知道B中方法具体实现的。

RPC是上层协议,底层可以基于TCP协议,也可以基于HTTP协议。一般我们说RPC都是基于RPC的具体实现,如:Dubbo框架。从广义上讲只要是满足网络中进行通讯调用都统称为RPC,甚至HTTP协议都可以说是RPC的具体实现,但是具体分析看来RPC协议要比HTTP协议更加高效,基于RPC的框架功能更多。

RPC协议是基于分布式架构而出现的,所以RPC在分布式项目中有着得天独厚的优势。

3     RPC和HTTP对比

3.1   具体实现

RPC:可以基于TCP协议,也可以基于HTTP协议。

HTTP:基于HTTP协议

3.2   效率

RPC:自定义具体实现可以减少很多无用的报文内容,使得报文体积更小。

HTTP:如果是HTTP 1.1 报文中很多内容都是无用的。如果是HTTP2.0以后和RPC相差不大,比RPC少的可能就是一些服务治理等功能。

3.3   连接方式

RPC:长连接支持。

HTTP:每次连接都是3次握手。

3.4   性能

RPC可以基于很多序列化方式。如:thrift

HTTP 主要是通过JSON,序列化和反序列效率更低。

3.5   注册中心

RPC :一般RPC框架都带有注册中心。·

HTTP:都是直连。

3.6   负载均衡

RPC:绝大多数RPC框架都带有负载均衡测量。

HTTP:一般都需要借助第三方工具。如:nginx

3.7   综合结论

RPC框架一般都带有丰富的服务治理等功能,更适合企业内部接口调用。而HTTP更适合多平台之间相互调用。

四、HttpClient实现RPC

1     HttpClient简介

在JDK中java.net包下提供了用户HTTP访问的基本功能,但是它缺少灵活性或许多应用所需要的功能。

HttpClient起初是Apache Jakarta Common 的子项目。用来提供高效的、最新的、功能丰富的支持 HTTP 协议的客户端编程工具包,并且它支持 HTTP 协议最新的版本。2007年成为顶级项目。

通俗解释:HttpClient可以实现使用Java代码完成标准HTTP请求及响应。

2     代码实现

2.1   服务端

新建项目HttpClientServer

2.1.1    新建控制器

com.msb.controller.DemoController

@Controller
public class DemoController {
    @RequestMapping("/demo")
    @ResponseBody
    public String demo(String param){
        return "demo"+param;
    }
}
2.1.2    新建启动器

新建启动器

com.msb.HttpClientServerApplication

@SpringBootApplication
public class HttpClientServerApplication {
    public static void main(String[] args) {
    SpringApplication.run(HttpClientServerApplication.class,args);
    }
}

2.2   客户端

新建HttpClientDemo项目

2.2.1    添加依赖
<dependencies>
    <dependency>
        <groupId>org.apache.httpcomponents</groupId>
        <artifactId>httpclient</artifactId>
        <version>4.5.10</version>
    </dependency>
</dependencies>
2.2.2    新建类

新建com.msb.HttpClientDemo,编写主方法。

2.2.2.1 使用GET方法访问
public static void main(String[] args) {
    try {
    	//创建http工具(理解成:浏览器) 发起请求,解析响应
        CloseableHttpClient httpClient = HttpClients.createDefault();
        //请求路径
        URIBuilder uriBuilder = new URIBuilder("http://localhost:8080/demo");
        uriBuilder.addParameter("param", "get123");
        //创建HttpGet请求对象
        HttpGet get = new HttpGet(uriBuilder.build());
        //创建响应对象
        CloseableHttpResponse response = httpClient.execute(get);
        //由于响应体是字符串,因此把HttpEntity类型转换为字符串类型,并设置字符编码
        String result = EntityUtils.toString(response.getEntity(), "utf-8");
        //输出结果
        System.out.println(result);
        //释放资源
        response.close();
        httpClient.close();
    } catch (URISyntaxException e) {
        e.printStackTrace();
    } catch (IOException e) {
        e.printStackTrace();
    }
}
2.2.2.2 使用POST方式访问
public class HttpClientDemo {
    public static void main(String[] args) {
        try {
        	//创建http工具(理解成:浏览器) 发起请求,解析响应
            CloseableHttpClient httpClient = HttpClients.createDefault();
            //创建HttpPOST请求对象
            HttpPost post = new HttpPost("http://localhost:8080/demo");
            //所有请求参数
            List<NameValuePair> params = new ArrayList<>();
            params.add(new BasicNameValuePair("param","123"));
            //创建HttpEntity接口的文本实现类的对象,放入参数并设置编码
            HttpEntity httpEntity = new UrlEncodedFormEntity(params,"utf-8");
            //放入到HttpPost对象中
            post.setEntity(httpEntity);            
            //创建响应对象
            CloseableHttpResponse response = httpClient.execute(post);
            //由于响应体是字符串,因此把HttpEntity类型转换为字符串类型
            String result = EntityUtils.toString(response.getEntity());
            //输出结果
            System.out.println(result);
            //释放资源
            response.close();
            httpClient.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

3. Jackson用法

3.1   把对象转换为json字符串
ObjectMapper objectMapper = new ObjectMapper();
People peo = new People();
objectMapper.writeValueAsString(peo);
3.2   把json字符串转换为对象
ObjectMapper objectMapper = new ObjectMapper();
People peo = objectMapper.readValue(content, People.class);
3.3   把json字符串转换为List集合
ObjectMapper objectMapper = new ObjectMapper();
JavaType javaType = objectMapper.getTypeFactory().constructParametricType(List.class, People.class);
List<People> list = objectMapper.readValue(content, javaType);

4     HttpClient请求包含JSON

4.1   java代码实现
public class HttpClientDemo {
    public static void main(String[] args) {
        try {
            CloseableHttpClient httpClient = HttpClients.createDefault();
            HttpPost post = new HttpPost("http://localhost:8080/demo");
            HttpEntity httpEntity= null;
String json = "{}";
            StringEntity entity = new StringEntity(json, ContentType.APPLICATION_JSON);
            post.setEntity(entity);
            CloseableHttpResponse response = httpClient.execute(post);
            String result = EntityUtils.toString(response.getEntity());
            System.out.println(result);
            response.close();
            httpClient.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

5     控制器接口参数

@RequestBody把请求体中流数据转换为指定的对象。多用在请求参数是json数据且请求的Content-Type=”application/json”

@RequestMapping("/demo4")
@ResponseBody
public String demo4(@RequestBody List<People> list) {
    System.out.println(list);
    return list.toString();
}

6   Ajax发送json参数写法

var json = '[{"id":123,"name":"msb"},{"id":123,"name":"mashibing"}]';
 $.ajax({
     url:'/demo5',
     type:'post',
     success:function(data){
         alert(data);
         for(var i = 0 ;i<data.length;i++){

             alert(data[i].id +"  "+data[i].name);
         }
     },
     contentType:'application/json',//请求体中内容类型
     dataType:'json',//响应内容类型。
     data:json
 });

7     跨域

跨域:协议、ip、端口中只要有一个不同就是跨域请求。

同源策略:浏览器默认只允许ajax访问同源(协议、ip、端口都相同)内容。

解决同源策略:

在控制器接口上添加@CrossOrigin。表示允许跨域。本质在响应头中添加Access-Control-Allow-Origin: *

var json = '[{"id":123,"name":"msb"},{"id":456,"name":"mashibing"}]';
 $.ajax({
     url:'/demo5',
     type:'post',
     success:function(data){
         alert(data);
         for(var i = 0 ;i<data.length;i++){

             alert(data[i].id +"  "+data[i].name);
         }
     },
     contentType:'application/json',//请求体中内容类型
     dataType:'json',//响应内容类型。
     data:json
 });

五、RMI实现RPC

1     RMI简介

RMI(Remote Method Invocation) 远程方法调用。

RMI是从JDK1.2推出的功能,它可以实现在一个Java应用中可以像调用本地方法一样调用另一个服务器中Java应用(JVM)中的内容。

RMI 是Java语言的远程调用,无法实现跨语言。

2     执行流程

Registry(注册表)是放置所有服务器对象的命名空间。 每次服务端创建一个对象时,它都会使用bind()或rebind()方法注册该对象。 这些是使用称为绑定名称的唯一名称注册的。

要调用远程对象,客户端需要该对象的引用。即通过服务端绑定的名称从注册表中获取对象(lookup()方法)。

3     API介绍

3.1   Remote

java.rmi.Remote 定义了此接口为远程调用接口。如果接口被外部调用,需要继承此接口。

public interface Remote{}

3.2   RemoteException

java.rmi.RemoteException

继承了Remote接口的接口中,如果方法是允许被远程调用的,需要抛出此异常。

3.3   UnicastRemoteObject

java.rmi.server.UnicastRemoteObject

此类实现了Remote接口和Serializable接口。

自定义接口实现类除了实现自定义接口还需要继承此类。

3.4   LocateRegistry

java.rmi.registry.LocateRegistry

可以通过LocateRegistry在本机上创建Registry,通过特定的端口就可以访问这个Registry。

3.5   Naming

java.rmi.Naming

Naming定义了发布内容可访问RMI名称。也是通过Naming获取到指定的远程方法。

4     代码实现

4.1   服务端创建

创建RmiServer项目

4.1.1    编写接口

com.msb.service.DemoService 编写

public interface DemoService extends Remote {
    String demo(String demo) throws RemoteException;
}
4.1.2    编写实现类

com.msb.service.impl.DemoServiceImpl 编写。

注意:构造方法是public的。默认生成protected

public class DemoServiceImpl extends UnicastRemoteObject implements DemoService {
    public DemoServiceImpl() throws RemoteException {
    }
    @Override
    public String demo(String demo) throws RemoteException {
        return demo+"123";
    }
}
4.1.3    编写主方法

编写com.msb.DemoServer类,生成主方法

public class DemoServiceImpl extends UnicastRemoteObject implements DemoService {
    public DemoServiceImpl() throws RemoteException {
    }
    @Override
    public String demo(String demo) throws RemoteException {
        return demo+"123";
    }
}
4.1.4    运行项目

运行后项目,项目一直处于启动状态,表示可以远程访问此项目中的远程方法。

4.2   创建客户端代码

创建项目RmiClient

4.2.1    复制服务端接口

把服务端com.msb.service.DemoService粘贴到项目中

4.2.2    创建主方法类

新建com.msb.DemoClient

public class DemoServiceImpl extends UnicastRemoteObject implements DemoService {
    public DemoServiceImpl() throws RemoteException {
    }
    @Override
    public String demo(String demo) throws RemoteException {
        return demo+"123";
    }
}

六、Zookeeper安装

1     Zookeeper简介

zookeeper分布式管理软件。常用它做注册中心(依赖zookeeper的发布/订阅功能)、配置文件中心、分布式锁配置、集群管理等。

zookeeper一共就有两个版本。主要使用的是java语言写的。

2     安装

2.1   上传压缩文件

上传到 /usr/local/tmp中

2.2   解压

# tar zxf apache-zookeeper-3.5.5-bin.tar.gz
# cp -r apache-zookeeper-3.5.5-bin ../zookeeper

2.3   新建data目录

进入到zookeeper中

# cd /usr/local/zookeeper
# mkdir data

2.4   修改配置文件

进入conf中

# cd conf
# cp zoo_sample.cfg zoo.cfg
# vim zoo.cfg

修改dataDir为data文件夹路径

dataDir=/usr/local/zookeeper/data

2.5   启动zookeeper

进入bin文件夹

# cd /usr/local/zookeeper/bin
# ./zkServer.sh start

通过status查看启动状态。稍微有个等待时间

# ./zkServer.sh status

七、Zookeeper客户端常用命令

进入到./zkCli.sh命令行工具后,可以使用下面常用命令

1    ls

ls [-s][-R] /path

-s 详细信息,替代老版的ls2

-R 当前目录和子目录中内容都罗列出来

例如:ls -R / 显示根目录下所有内容

2     create

create /path [data]

[data] 包含内容

创建指定路径信息

例如:create /demo 创建/demo

3     get

get [-s] /path

[-s] 详细信息

查看指定路径下内容。

例如: get -s /demo

null:存放的数据

cZxid:创建时zxid(znode每次改变时递增的事务id)

ctime:创建时间戳

mZxid:最近一次更新的zxid

mtime:最近一次更新的时间戳

pZxid:子节点的zxid

cversion:子节点更新次数

dataversion:节点数据更新次数

aclVersion:节点ACL(授权信息)的更新次数

ephemeralOwner:如果该节点为ephemeral节点(临时,生命周期与session一样), ephemeralOwner值表示与该节点绑定的session id. 如果该节点不是ephemeral节点, ephemeralOwner值为0.

dataLength:节点数据字节数

numChildren:子节点数量

4     set

set /path data

设置节点内容

5     delete

delete /path

删除节点

八、向Zookeeper中注册内容

新建项目ZookeeperClient

1     创建/demo

使用zookeeper的客户端命令工具创建/demo

./zkCli.sh
create /demos

2     添加依赖

<dependencies>
    <dependency>
        <groupId>org.apache.zookeeper</groupId>
        <artifactId>zookeeper</artifactId>
        <version>3.5.5</version>
    </dependency>
</dependencies>

3     编写代码

创建类com.msb.MyApp。

ZooDefs.Ids.OPEN_ACL_UNSAFE 表示权限。

CreateMode.PERSISTENT_SEQUENTIAL 永久存储,文件内容编号递增。

public static void main(String [] args){
    try {
        ZooKeeper zookeeper = new ZooKeeper("192.168.32.128:2181", 10000, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                System.out.println("获取连接");
            }
        });
        String content = zookeeper.create("/demo/nn", "content".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
        System.out.println("content"+content);
    } catch (IOException e) {
        e.printStackTrace();
    } catch (KeeperException e) {
        e.printStackTrace();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

4     查看上传数据

ls -R /            :查看列表

get /demo/nn0000000002     :查看内容

九、    从zookeeper中发现内容

在原有项目中新建一个类,类中编写主方法。

public static void main(String[] args) {
    try {
        ZooKeeper zookeeper = new ZooKeeper("192.168.32.128:2181", 10000, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                System.out.println("获取连接");
            }
        });
        //获取列表
        List<String> list = zookeeper.getChildren("/demo", false);
        for (String child : list) {
            byte[] result = zookeeper.getData("/demo/" + child, false, null);
            System.out.println(new String(result));
        }
    } catch (IOException e) {
        e.printStackTrace();
    } catch (KeeperException e) {
        e.printStackTrace();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

十、手写RPC框架

使用Zookeeper作为注册中心,RMI作为连接技术,手写RPC框架。

1     创建项目ParentDemo

创建父项目ParentDemo。

包含3个聚合子项目。

pojo: service中需要的实体类

service:包含被serviceimpl和consumer依赖的接口。

serviceimpl:provider提供的服务内容

consumer:消费者,调用服务内容。

2     在父项目中添加依赖

public static void main(String[] args) {
    try {
        ZooKeeper zookeeper = new ZooKeeper("192.168.32.128:2181", 10000, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                System.out.println("获取连接");
            }
        });
        //获取列表
        List<String> list = zookeeper.getChildren("/demo", false);
        for (String child : list) {
            byte[] result = zookeeper.getData("/demo/" + child, false, null);
            System.out.println(new String(result));
        }
    } catch (IOException e) {
        e.printStackTrace();
    } catch (KeeperException e) {
        e.printStackTrace();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

3     创建service项目

4     创建DemoService接口

创建com.msb.DemoService

public interface DemoService extends Remote {
    String demo(String param) throws RemoteException;
}

5     创建serviceimpl项目

此项目编写接口具体实现,RMI服务发布和把信息发送到Zookeeper中。

项目结构如下:

在pom.xml中添加对service项目的依赖

<dependencies>
    <dependency>
        <artifactId>service</artifactId>
        <groupId>com.msb</groupId>
        <version>1.0.0</version>
    </dependency>
</dependencies>

6     创建DemoServiceImpl

创建com.bjsxt.service.impl.DemoServiceImpl

public class DemoServiceImpl extends UnicastRemoteObject implements DemoService {
    public DemoServiceImpl() throws RemoteException {
    }

    @Override
    public String demo(String param) throws RemoteException{
        return param+"123";
    }
}

7     创建RmiRun

创建com.msb.RmiRun。实现RMI服务的发布和Zookeeper消息的发布。

public class RmiRun {
    public static void main(String[] args) {
        try {
            DemoService demoService = new DemoServiceImpl();
            LocateRegistry.createRegistry(8888);
            String url = "rmi://localhost:8888/demoService";
            Naming.bind(url,demoService);
            ZooKeeper zookeeper = new ZooKeeper("192.168.32.128:2181", 10000, new Watcher() {
                @Override
                public void process(WatchedEvent watchedEvent) {
                    System.out.println("获取连接");
                }
            });
            String content = zookeeper.create("/demo/demoService",url.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            System.out.println("服务发布成功。。。。");
        } catch (AlreadyBoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

8     创建Consumer项目

新建consumer项目,此项目需要从zookeeper中获取rmi信息,并调用rmi服务

在pom.xml中添加对service项目的依赖

<dependencies>
    <dependency>
        <artifactId>service</artifactId>
        <groupId>com.msb</groupId>
        <version>1.0.0</version>
    </dependency>
</dependencies>

9     创建接口和实现类

创建com.msb.service.ConsumerService接口

创建com.msb.service.impl.ConsumerServiceImpl实现类

public interface ConsumerService {
    String consumerService(String param);
}
@Service
public class ConsumerServiceImpl implements ConsumerService {
    @Override
    public String consumerService(String param) {
        try {
            ZooKeeper zookeeper = new ZooKeeper("192.168.32.128:2181", 10000, new Watcher() {
                @Override
                public void process(WatchedEvent watchedEvent) {
                    System.out.println("获取连接");
                }
            });
            byte[] urlByte = zookeeper.getData("/demo/demoService", false, null);
            DemoService demoService =(DemoService) Naming.lookup(new String(urlByte));
            String result = demoService.demo(param);
            System.out.println(result);
            return result;
        } catch (IOException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (NotBoundException e) {
            e.printStackTrace();
        }
        return null;
    }
}

10  创建控制器

创建com.msb.controller.DemoController控制器

@Controller
public class DemoController {

    @Autowired
    private ConsumerService consumerService;

    @RequestMapping("/demo")
    @ResponseBody
    public String demo(String param){
        return consumerService.consumerService(param);
    }
}

11  创建启动器

创建com.msb.ConsumerApplication

@SpringBootApplication
public class ConsumerApplication {
    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class,args);
    }
}

12  测试

在浏览器输入:http://localhost:8080/demo?param=demo

观察结果是否是:demo123

gRPC

  1. gRPC 是一个高性能、开源和通用的 RPC 框架,面向移动和 HTTP/2 设计。
  2. gRPC 基于 HTTP/2 标准设计,带来诸如双向流、流控、头部压缩、单 TCP 连接上的多复用请求等特。
  3. 在 gRPC 里_客户端_应用可以像调用本地对象一样直接调用另一台不同的机器上_服务端_应用的方法,使得您能够更容易地创建分布式应用和服务。与许多 RPC 系统类似,gRPC 也是基于以下理念:定义一个_服务_,指定其能够被远程调用的方法(包含参数和返回类型)。在服务端实现这个接口,并运行一个 gRPC 服务器来处理客户端调用。在客户端拥有一个_存根_能够像服务端一样的方法。

![GRPC进程调用图](https://cdn.nlark.com/yuque/0/2023/png/12519343/1685622700940-5818eae7-b523-4cc3-a4a9-82137c2f1390.png#averageHue=%23fcf1dd&clientId=u0b3f633e-7efe-4&from=paste&height=490&id=u5f314b75&originHeight=490&originWidth=764&originalType=binary&ratio=1&rotation=0&showTitle=true&size=54588&status=done&style=none&taskId=u5d87c6e5-e05b-43c3-8460-55a22845b48&title=GRPC%E8%BF%9B%E7%A8%8B%E8%B0%83%E7%94%A8%E5%9B%BE&width=764 “GRPC进程调用图”)

gRPC与HTTP2

gRPC引入了三个新概念:channel、RPC和Message。三者之间的关系很简单:每个Channel可能有许多RPC,而每个RPC可能有许多Message。HTTP/2 提供给 GRPC长链接和实时通讯。

1、HTTP/2协议特点

  • 二进制格式,协议解析使用二进制格式
  • 多路复用,即连接共享,即每一个request都是使用连接共享机制的。
  • header压缩,HTTP2.0使用encoder来减少需要传输的header大小,通讯双方各自cache一份header fields表,既避免了重复header的传输,又减小了需要传输的大小。
  • 服务端推送

2、GRPC关联HTTP/2

HTTP/2中的流在一个连接上允许多个并发会话,而gRPC的通过支持多个并发连接上的多个流扩展了这个概念。

  • Channel: 表示和终端的一个虚拟链接

Channel 背后实际上可能有多个HTTP/2 连接。从上面关系图来看,一个RPC和一个HTTP/2连接相关联,rpc实际上是纯HTTP/2流。Message与rpc关联,并以HTTP/2数据帧的形式发送。

消息是在数据帧之上分层的。一个数据帧可能有许多gRPC消息,或者如果一个gRPC消息非常大,它可能跨越多个数据帧。

Protocol Buffers

一个与语言和平台无关具有可拓展的用于序列化结构化的数据(例如:XML、JSON)的协议,只需定义数据的结构化方式,然后就可以使用特殊生成的源代码轻松地向各种数据流写入和读取结构化数据,并可以被各种语言使用。**Protocol Buffers 负责gRPC的结构化数据的序列化**

1、类型

  • 字符串:string ,(默认为””)
  • 整型:int32, int64, uint32, uint64 (默认为0)
  • 浮点: double, float (默认为0)
  • 字节流: bytes (默认为nil)
  • 布尔类型:bool ,(默认false)
  • 枚举:enum (默认为0,枚举的第一个值必须是0)
  • map类型: 比如 map&lt;string,string&gt; (默认为nil)
  • oneof 类型,它是一个结构体,但是它定义你结构体中只能赋值一个字段,就算你有多个字段!(默认为nil)
oneof test_oneof {
        string v3 = 3;
        uint32 v4 = 4;
    }
// 赋值时只能使用V3或V4

2、编号

消息定义中的每个字段都有一个唯一的编号。这些字段编号用于以消息二进制格式标识字段,在使用消息类型后不应更改。注意,范围1到15中的字段编号需要一个字节进行编码,包括字段编号和字段类型。范围16到2047的字段编号采用两个字节。因此,应该为经常出现的消息元素保留数字1到15。

3、字段规则

  • singular,默认类型就是这个,不需要申明
  • repeated, 类似于数组,go里面默认就转换成了数组,是个list所以不会自动去重

GRPC中的4种类型的服务方法

1、单向RPC

普通的接口服务方法,相当于一次函数调用。

// 定义方式
rpc getMessById(IdRequest) returns (MessResponse){}

2、服务端流式 RPC

客户端发送一个请求给服务端,可获取一个数据流用来读取一系列消息。客户端从返回的数据流里一直读取直到没有更多消息为止,例如客户端想服务端请求获取后续某时间段的信息,后续时间内每产生一条信息,客户端就会返回给用户,直接时间段结束。

rpc getNewMessInTime(TimeRequest) returns (stream MessResponse){}

3、客户端流式RPC

客户端用提供的一个数据流写入并发送一系列消息给服务端。一旦客户端完成消息写入,就等待服务端读取这些消息并返回应答,例如用户在购物时,不停地向购物车中增加商品,直到用户下单结束请求。

rpc putGoodsAndBuy(stream GoodsRequest) returns (ResponseCode){}

4、双向流RPC

客户端和服务端都可以分别通过一个读写数据流来发送一系列消息。这两个数据流操作是相互独立的,所以客户端和服务端能按其希望的任意顺序读写。例如可以使用双向流RPC实现一个简易的聊天室能力,双方持续输入一些信息,会根据信息发送的顺序进行读取和展示。

rpc chat(stream RequestMess) returns (stream ResponseMess){}

GRPC四种类型服务实现方式

1、定义服务

可以看做是Dubbo中的暴露接口,指定其可以被远程调用的方法及其参数和返回类型,但是grpc是使用protocol buffers 作为接口定义语言。可以编译.proto文件生成服务端和客户端的代码。

/使用proto3语法
syntax = &quot;proto3&quot;;

//这个字段是可选的,如果设置为 true,表示每一个 message 文件都会有一个单独的 class 文件;
//否则,message 全部定义在 outerclass 文件里。
option java_multiple_files = true;

//这个字段是可选的,用于标识生成的 java 文件的 package。
//如果没有指定,则使用 proto 里定义的 package,如果package 也没有指定,那就会生成在根目录下。
option java_package = &quot;io.renxing.grpc.proto&quot;;

//这个字段是可选的,用于指定 proto 文件生成的 java 类的 outerclass 类名。
//outerclass:简单来说就是用一个 class 文件来定义所有的 message 对应的 Java 类,
//这个 class 就是 outerclass;如果没有指定,默认是 proto 文件的驼峰式;
option java_outer_classname = &quot;HelloProto&quot;;

//option objc_class_prefix = &quot;HL&quot;;
//这个属性用来定义 message 的包名。包名的含义与平台语言无关,
//这个 package 仅仅被用在 proto 文件中用于区分同名的 message 类型。
//可以理解为 message 全名的前缀,和 message 名称合起来唯一标识一个 message 类型。
//当我们在 proto 文件中导入其他 proto 文件的 message,需要加上 package 前缀才行。
//所以包名是用来唯一标识 message 的。
package news;

//定义grpc服务RouteGuide
service NewsService {
    // list方法名,NewsRequest代表传入的参数,NewsResponse 代表返回的响应
    rpc list (NewsRequest) returns (NewsResponse) {}
}

// The request message containing the user&#39;s name.
//类型于Java中的实体类
message NewsRequest {
    string date =1 ; // =1 其实是编号,
}

// The response message containing the greetings
message NewsResponse {
    repeated News news = 1;
}

//News 新闻实体对象
message News{
    int32  id =1;
    string title =2 ;
    string content =3;
    int64  createTime =4;
}

项目整合grpc依赖和proto插件生成代码

&lt;dependency&gt;
    &lt;groupId&gt;com.google.protobuf&lt;/groupId&gt;
    &lt;artifactId&gt;protobuf-java&lt;/artifactId&gt;
    &lt;version&gt;${protobuf.version}&lt;/version&gt;
&lt;/dependency&gt;

&lt;dependency&gt;
    &lt;groupId&gt;io.grpc&lt;/groupId&gt;
    &lt;artifactId&gt;grpc-all&lt;/artifactId&gt;
    &lt;version&gt;${grpc.version}&lt;/version&gt;
&lt;/dependency&gt;

&lt;build&gt;
    &lt;!-- os系统信息插件, protobuf-maven-plugin需要获取系统信息下载相应的protobuf程序 --&gt;
    &lt;extensions&gt;
        &lt;extension&gt;
            &lt;groupId&gt;kr.motd.maven&lt;/groupId&gt;
            &lt;artifactId&gt;os-maven-plugin&lt;/artifactId&gt;
            &lt;version&gt;1.6.2&lt;/version&gt;
        &lt;/extension&gt;
    &lt;/extensions&gt;
    &lt;plugins&gt;
        &lt;plugin&gt;
            &lt;!--                通过定义的环境变量找到具体的protobuf编译器位置--&gt;
            &lt;groupId&gt;org.xolstice.maven.plugins&lt;/groupId&gt;
            &lt;!--                通过这个插件maven自动根据proto文件生成Java代码--&gt;
            &lt;artifactId&gt;protobuf-maven-plugin&lt;/artifactId&gt;
            &lt;version&gt;0.5.1&lt;/version&gt;

            &lt;configuration&gt;
                &lt;!--                    用来编译生成文件--&gt;
                &lt;protocArtifact&gt;com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}
                &lt;/protocArtifact&gt;
                &lt;pluginId&gt;grpc-java&lt;/pluginId&gt;
                &lt;!--                    用于生成grpc的工具类,用于简化实际的数据处理过程,生成的代码简化程序开发工作--&gt;
                &lt;pluginArtifact&gt;io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}
                &lt;/pluginArtifact&gt;
                &lt;!-- proto文件目录 --&gt;
                &lt;protoSourceRoot&gt;${project.basedir}/src/main/proto&lt;/protoSourceRoot&gt;
                &lt;!-- 生成的Java文件目录 --&gt;
                &lt;outputDirectory&gt;${project.basedir}/src/main/java/&lt;/outputDirectory&gt;
                &lt;clearOutputDirectory&gt;false&lt;/clearOutputDirectory&gt;
            &lt;/configuration&gt;

            &lt;executions&gt;
                &lt;execution&gt;
                    &lt;goals&gt;
                        &lt;!--                            生成消息代码--&gt;
                        &lt;goal&gt;compile&lt;/goal&gt;
                        &lt;!--                            生成grpc的通讯文件--&gt;
                        &lt;goal&gt;compile-custom&lt;/goal&gt;
                    &lt;/goals&gt;
                &lt;/execution&gt;
            &lt;/executions&gt;
        &lt;/plugin&gt;
    &lt;/plugins&gt;
&lt;/build&gt;

2、服务端实现

package io.renxing.grpc.service;

import io.grpc.stub.StreamObserver;
import io.renxing.grpc.proto.News;
import io.renxing.grpc.proto.NewsRequest;
import io.renxing.grpc.proto.NewsResponse;
import io.renxing.grpc.proto.NewsServiceGrpc;

/**
 * @author zzli16
 * @version 1.0.0
 * @ClassName NewsService
 * @description GRPC服务实现类
 * @date 2023/5/25 10:17
 * 继承NewsServiceGrpc.NewsServiceImplBase类并重写list方法
 * grpc当中是一个双供协议,为了实现客户端于服务端之间的双向供应,返回值通过StreamObserver这个观察类作为最后一个参数进行传入
 * 获取在proto文件当中定义传入的参数值
 * 利用构造器模式newListbuilder对接收newList进行构建[newBuilder]
 * 返回值,这里不使用return直接返回,而是用responseObserver.onNext(newList);的方式进行返回,onNext就涉及到双供协议数据流的过程
 * 通知网络自己已经完成
 */
public class NewsService extends NewsServiceGrpc.NewsServiceImplBase {

    //重写Newsproto里面的list父类方法实现业务逻辑的实现
    //grpc当中是一个双供协议,为了实现客户端于服务端之间的双向供应,返回值通过StreamObserver这个观察类作为最后一个参数进行传入
    @Override
    public void list(NewsRequest request, StreamObserver&lt;NewsResponse&gt; responseObserver) {
        // 获取request参数
        String date = request.getDate();
        NewsResponse newList = null;
        try {
            // 构造器模式newListBuilder对接newList进行构建
            NewsResponse.Builder newListBuilder = NewsResponse.newBuilder();
            for (int i = 0; i &lt; 100; i++) {
                News news = News.newBuilder()
                        .setId(i)
                        .setContent(&quot;新闻标题&quot; + i)
                        .setCreateTime(System.currentTimeMillis())
                        .build();
                newListBuilder.addNews(news);
            }
            newList = newListBuilder.build();
        } catch (Exception e) {
            responseObserver.onError(e);
        } finally {
            responseObserver.onNext(newList);
        }
        // 通知网络服务完成
        responseObserver.onCompleted();
    }
}

// 服务端启动
public class GrpcServer {
    private static final int prot = 8989;

    public static void main(String[] args) throws IOException, InterruptedException {
          //创建一个server对象,声明监听端口(这里的端口自定义的,注意不要重复出现,防止端口占用)
          //同时通过new方法实例化NewsService(),通过addService加入其中,说明grpc需要管理这个NewService对象,这样远程才会发起对NewService的调用工作.build()完成构建,start()启动服务
          //调用.awaitTermination()方法,使得服务处于等待服务关闭,这样是服务一直等待使用的状态了
        Server server = ServerBuilder.forPort(prot).addService(new NewsService()).build().start();
        System.out.println(&quot;GRPC服务端启动成功,启动端口:&quot; + prot);
        server.awaitTermination();
    }
}

3、客户端实现调用

public class GrpcClient {
    private static final String host = &quot;localhost&quot;;
    private static final int port = 8989;

    public static void main(String[] args) {
        // 创建一个通讯管道channel, 构造器传入定义的服务IP和端口,usePlaintext标识一个文本传输通道
        ManagedChannel channel = ManagedChannelBuilder.forAddress(host, port)
                .usePlaintext().build();
        try {
            NewsServiceGrpc.NewsServiceBlockingStub blockingStub = NewsServiceGrpc.newBlockingStub(channel);
            NewsRequest request = NewsRequest.newBuilder().setDate(&quot;20230525&quot;).build();
            NewsResponse response = blockingStub.list(request);
            List&lt;News&gt; newsList = response.getNewsList();
            for (News news : newsList) {
                System.out.println(news.getId() + &quot;:&quot; + news.getContent());
            }
        } finally {
            channel.shutdown();
        }
    }
}

四种通信模式 GRPC参考文章:https://juejin.cn/post/7198041490618023973

双向流 RPC

客户端可以多次发送数据,服务端可以多次相应数据。流RPC操作的对象都是StreamObserver 对象,调用StreamObserver 对象的onNext()方法就是传递相应数据,onCompleted()方法结束传递。

public class BookServiceImpl extends BookServiceGrpc.BookServiceImplBase {
    private Map&lt;String, Book&gt; bookMap = new HashMap&lt;&gt;();
    private List&lt;Book&gt; books = new ArrayList&lt;&gt;();

    public BookServiceImpl() {
        Book b1 = Book.newBuilder().setId(&quot;1&quot;).setName(&quot;三国演义&quot;).setAuthor(&quot;罗贯中&quot;).setPrice(30).addTags(&quot;明清小说&quot;).addTags(&quot;通俗小说&quot;).build();
        Book b2 = Book.newBuilder().setId(&quot;2&quot;).setName(&quot;西游记&quot;).setAuthor(&quot;吴承恩&quot;).setPrice(40).addTags(&quot;志怪小说&quot;).addTags(&quot;通俗小说&quot;).build();
        Book b3 = Book.newBuilder().setId(&quot;3&quot;).setName(&quot;水浒传&quot;).setAuthor(&quot;施耐庵&quot;).setPrice(50).addTags(&quot;明清小说&quot;).addTags(&quot;通俗小说&quot;).build();
        bookMap.put(&quot;1&quot;, b1);
        bookMap.put(&quot;2&quot;, b2);
        bookMap.put(&quot;3&quot;, b3);
    }
    @Override
    public StreamObserver&lt;StringValue&gt; processBooks(StreamObserver&lt;BookSet&gt; responseObserver) {
        return new StreamObserver&lt;StringValue&gt;() {
            @Override
            public void onNext(StringValue stringValue) {
                Book b = Book.newBuilder().setId(stringValue.getValue()).build();
                books.add(b);
                if (books.size() == 3) {
                    BookSet bookSet = BookSet.newBuilder().addAllBookList(books).build();
                    responseObserver.onNext(bookSet);
                    books.clear();
                }
            }

            @Override
            public void onError(Throwable throwable) {

            }

            @Override
            public void onCompleted() {
                BookSet bookSet = BookSet.newBuilder().addAllBookList(books).build();
                responseObserver.onNext(bookSet);
                books.clear();
                responseObserver.onCompleted();
            }
        };
    }
}


public class BookServiceClient {
    public static void main(String[] args) throws InterruptedException {
        ManagedChannel channel = ManagedChannelBuilder.forAddress(&quot;localhost&quot;, 50051)
                .usePlaintext()
                .build();
        BookServiceGrpc.BookServiceStub stub = BookServiceGrpc.newStub(channel);
        processBook(stub);
    }

    private static void processBook(BookServiceGrpc.BookServiceStub stub) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        StreamObserver&lt;StringValue&gt; request = stub.processBooks(new StreamObserver&lt;BookSet&gt;() {
            @Override
            public void onNext(BookSet bookSet) {
                System.out.println(&quot;bookSet = &quot; + bookSet);
                System.out.println(&quot;=============&quot;);
            }

            @Override
            public void onError(Throwable throwable) {
            }

            @Override
            public void onCompleted() {
                System.out.println(&quot;处理完毕!&quot;);
                countDownLatch.countDown();
            }
        });
        request.onNext(StringValue.newBuilder().setValue(&quot;a&quot;).build());
        request.onNext(StringValue.newBuilder().setValue(&quot;b&quot;).build());
        request.onNext(StringValue.newBuilder().setValue(&quot;c&quot;).build());
        request.onNext(StringValue.newBuilder().setValue(&quot;d&quot;).build());
        request.onCompleted();
        countDownLatch.await();
    }
}

发表评论

您的邮箱地址不会被公开。 必填项已用 * 标注

滚动至顶部