fabric创建通道过程

2024-08-29 16:08
文章标签 创建 过程 通道 fabric

本文主要是介绍fabric创建通道过程,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

创建通道

  • 1. Client 处理过程
    • 1.1. 发送创建通道的交易
    • 1.2. 获取通道的创世区块
  • 2. Orderer 处理过程
    • 2.1. 处理消息
  • 3. 参考资料

创建通道的过程

Client Orderer 用户指定或从默认配置中生成创建通道的交易mychannel.tx, 该消息是Envelope结构的CONFIG_UPDATE类型消息 对mychannel.tx进行签名并封装 发送封装好的消息 接收消息 获取通道消息处理器,- 因为是创建通道,<br- >则调用默认的系统通- 道管理器 对消息进行重新封装, 头部消息类型为ORDERER_TRANSACTION, 内层消息类型为CONFIG,每次封装都会签名 返回处理结果 将配置交易消息构造成创世区块 并保存到新通道的数据文件中 获取区块号为0的创世区块 loop [Client每200毫秒请求一次] par [Orderer处理] [Client处理] 返回创世区块 保存创世区块 Client Orderer

1. Client 处理过程

internal/peer/channel/create.go

func executeCreate(cf *ChannelCmdFactory) error {// 发送创建通道的交易err := sendCreateChainTransaction(cf)if err != nil {return err}// 获取通道的创世区块block, err := getGenesisBlock(cf)if err != nil {return err}b, err := proto.Marshal(block)if err != nil {return err}file := channelID + ".block"if outputBlock != common.UndefinedParamValue {file = outputBlock}// 保存通道创世区块信息err = ioutil.WriteFile(file, b, 0o644)if err != nil {return err}return nil
}

1.1. 发送创建通道的交易

internal/peer/channel/create.go

func sendCreateChainTransaction(cf *ChannelCmdFactory) error {var err errorvar chCrtEnv *cb.Envelope// 如果传入创建通道的交易信息,则读取并解析if channelTxFile != "" {if chCrtEnv, err = createChannelFromConfigTx(channelTxFile); err != nil {return err}} else { // 如果没有传入,则根据默认的配置yaml进行创建if chCrtEnv, err = createChannelFromDefaults(cf); err != nil {return err}}// 对交易信息进行必要检查,通过后对消息进行签名if chCrtEnv, err = sanityCheckAndSignConfigTx(chCrtEnv, cf.Signer); err != nil {return err}var broadcastClient common.BroadcastClientbroadcastClient, err = cf.BroadcastFactory()if err != nil {return errors.WithMessage(err, "error getting broadcast client")}defer broadcastClient.Close()// 发送封装好的消息err = broadcastClient.Send(chCrtEnv)return err
}

1.2. 获取通道的创世区块

每隔 200 毫秒尝试从排序节点获取一次通道的创世区块,即区块号为 0 的区块。

internal/peer/channel/create.go

func getGenesisBlock(cf *ChannelCmdFactory) (*cb.Block, error) {timer := time.NewTimer(timeout)defer timer.Stop()for {select {case <-timer.C:cf.DeliverClient.Close()return nil, errors.New("timeout waiting for channel creation")default:// 获取创世区块if block, err := cf.DeliverClient.GetSpecifiedBlock(0); err != nil {cf.DeliverClient.Close()cf, err = InitCmdFactory(EndorserNotRequired, PeerDeliverNotRequired, OrdererRequired)if err != nil {return nil, errors.WithMessage(err, "failed connecting")}// 休眠 200 毫秒time.Sleep(200 * time.Millisecond)} else {cf.DeliverClient.Close()return block, nil}}}
}

2. Orderer 处理过程

orderer/common/broadcast/broadcast.go

// Handle reads requests from a Broadcast stream, processes them, and returns the responses to the stream
func (bh *Handler) Handle(srv ab.AtomicBroadcast_BroadcastServer) error {addr := util.ExtractRemoteAddress(srv.Context())logger.Debugf("Starting new broadcast loop for %s", addr)for {// 接收消息msg, err := srv.Recv()if err == io.EOF {logger.Debugf("Received EOF from %s, hangup", addr)return nil}if err != nil {logger.Warningf("Error reading from %s: %s", addr, err)return err}// 处理消息resp := bh.ProcessMessage(msg, addr)// 发送处理结果err = srv.Send(resp)if resp.Status != cb.Status_SUCCESS {return err}if err != nil {logger.Warningf("Error sending to %s: %s", addr, err)return err}}
}

2.1. 处理消息

orderer/common/broadcast/broadcast.go

// ProcessMessage validates and enqueues a single message
func (bh *Handler) ProcessMessage(msg *cb.Envelope, addr string) (resp *ab.BroadcastResponse) {tracker := &MetricsTracker{ChannelID: "unknown",TxType:    "unknown",Metrics:   bh.Metrics,}defer func() {// This looks a little unnecessary, but if done directly as// a defer, resp gets the (always nil) current state of resp// and not the return valuetracker.Record(resp)}()tracker.BeginValidate()// 获取通道消息处理器,如果是创建通道,则调用默认的系统通道管理器chdr, isConfig, processor, err := bh.SupportRegistrar.BroadcastChannelSupport(msg)if chdr != nil {tracker.ChannelID = chdr.ChannelIdtracker.TxType = cb.HeaderType(chdr.Type).String()}if err != nil {logger.Warningf("[channel: %s] Could not get message processor for serving %s: %s", tracker.ChannelID, addr, err)return &ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST, Info: err.Error()}}if !isConfig {logger.Debugf("[channel: %s] Broadcast is processing normal message from %s with txid '%s' of type %s", chdr.ChannelId, addr, chdr.TxId, cb.HeaderType_name[chdr.Type])configSeq, err := processor.ProcessNormalMsg(msg)if err != nil {logger.Warningf("[channel: %s] Rejecting broadcast of normal message from %s because of error: %s", chdr.ChannelId, addr, err)return &ab.BroadcastResponse{Status: ClassifyError(err), Info: err.Error()}}tracker.EndValidate()tracker.BeginEnqueue()if err = processor.WaitReady(); err != nil {logger.Warningf("[channel: %s] Rejecting broadcast of message from %s with SERVICE_UNAVAILABLE: rejected by Consenter: %s", chdr.ChannelId, addr, err)return &ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE, Info: err.Error()}}err = processor.Order(msg, configSeq)if err != nil {logger.Warningf("[channel: %s] Rejecting broadcast of normal message from %s with SERVICE_UNAVAILABLE: rejected by Order: %s", chdr.ChannelId, addr, err)return &ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE, Info: err.Error()}}} else { // 对于创建通道,isConfig 值为 true,执行以下逻辑logger.Debugf("[channel: %s] Broadcast is processing config update message from %s", chdr.ChannelId, addr)// 对消息进行重新封装,头部消息类型为ORDERER_TRANSACTION,内层消息类型为CONFIGconfig, configSeq, err := processor.ProcessConfigUpdateMsg(msg)if err != nil {logger.Warningf("[channel: %s] Rejecting broadcast of config message from %s because of error: %s", chdr.ChannelId, addr, err)return &ab.BroadcastResponse{Status: ClassifyError(err), Info: err.Error()}}tracker.EndValidate()tracker.BeginEnqueue()if err = processor.WaitReady(); err != nil {logger.Warningf("[channel: %s] Rejecting broadcast of message from %s with SERVICE_UNAVAILABLE: rejected by Consenter: %s", chdr.ChannelId, addr, err)return &ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE, Info: err.Error()}}// 将配置更新消息进行广播err = processor.Configure(config, configSeq)if err != nil {logger.Warningf("[channel: %s] Rejecting broadcast of config message from %s with SERVICE_UNAVAILABLE: rejected by Configure: %s", chdr.ChannelId, addr, err)return &ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE, Info: err.Error()}}}logger.Debugf("[channel: %s] Broadcast has successfully enqueued message of type %s from %s", chdr.ChannelId, cb.HeaderType_name[chdr.Type], addr)return &ab.BroadcastResponse{Status: cb.Status_SUCCESS}
}

3. 参考资料

  • Fabric创建通道流程解析

这篇关于fabric创建通道过程的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1118329

相关文章

Python中使用uv创建环境及原理举例详解

《Python中使用uv创建环境及原理举例详解》uv是Astral团队开发的高性能Python工具,整合包管理、虚拟环境、Python版本控制等功能,:本文主要介绍Python中使用uv创建环境及... 目录一、uv工具简介核心特点:二、安装uv1. 通过pip安装2. 通过脚本安装验证安装:配置镜像源(可

canal实现mysql数据同步的详细过程

《canal实现mysql数据同步的详细过程》:本文主要介绍canal实现mysql数据同步的详细过程,本文通过实例图文相结合给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的... 目录1、canal下载2、mysql同步用户创建和授权3、canal admin安装和启动4、canal

MySQL存储过程之循环遍历查询的结果集详解

《MySQL存储过程之循环遍历查询的结果集详解》:本文主要介绍MySQL存储过程之循环遍历查询的结果集,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录前言1. 表结构2. 存储过程3. 关于存储过程的SQL补充总结前言近来碰到这样一个问题:在生产上导入的数据发现

SpringBoot集成LiteFlow实现轻量级工作流引擎的详细过程

《SpringBoot集成LiteFlow实现轻量级工作流引擎的详细过程》LiteFlow是一款专注于逻辑驱动流程编排的轻量级框架,它以组件化方式快速构建和执行业务流程,有效解耦复杂业务逻辑,下面给大... 目录一、基础概念1.1 组件(Component)1.2 规则(Rule)1.3 上下文(Conte

Java中实现线程的创建和启动的方法

《Java中实现线程的创建和启动的方法》在Java中,实现线程的创建和启动是两个不同但紧密相关的概念,理解为什么要启动线程(调用start()方法)而非直接调用run()方法,是掌握多线程编程的关键,... 目录1. 线程的生命周期2. start() vs run() 的本质区别3. 为什么必须通过 st

Spring Boot 整合 Apache Flink 的详细过程

《SpringBoot整合ApacheFlink的详细过程》ApacheFlink是一个高性能的分布式流处理框架,而SpringBoot提供了快速构建企业级应用的能力,下面给大家介绍Spri... 目录Spring Boot 整合 Apache Flink 教程一、背景与目标二、环境准备三、创建项目 & 添

Macos创建python虚拟环境的详细步骤教学

《Macos创建python虚拟环境的详细步骤教学》在macOS上创建Python虚拟环境主要通过Python内置的venv模块实现,也可使用第三方工具如virtualenv,下面小编来和大家简单聊聊... 目录一、使用 python 内置 venv 模块(推荐)二、使用 virtualenv(兼容旧版 P

pytest+allure环境搭建+自动化实践过程

《pytest+allure环境搭建+自动化实践过程》:本文主要介绍pytest+allure环境搭建+自动化实践过程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐... 目录一、pytest下载安装1.1、安装pytest1.2、检测是否安装成功二、allure下载安装2.

Pytorch介绍与安装过程

《Pytorch介绍与安装过程》PyTorch因其直观的设计、卓越的灵活性以及强大的动态计算图功能,迅速在学术界和工业界获得了广泛认可,成为当前深度学习研究和开发的主流工具之一,本文给大家介绍Pyto... 目录1、Pytorch介绍1.1、核心理念1.2、核心组件与功能1.3、适用场景与优势总结1.4、优

Redis指南及6.2.x版本安装过程

《Redis指南及6.2.x版本安装过程》Redis是完全开源免费的,遵守BSD协议,是一个高性能(NOSQL)的key-value数据库,Redis是一个开源的使用ANSIC语言编写、支持网络、... 目录概述Redis特点Redis应用场景缓存缓存分布式会话分布式锁社交网络最新列表Redis各版本介绍旧