fabric创建通道过程

2024-08-29 16:08
文章标签 创建 过程 通道 fabric

本文主要是介绍fabric创建通道过程,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

创建通道

  • 1. Client 处理过程
    • 1.1. 发送创建通道的交易
    • 1.2. 获取通道的创世区块
  • 2. Orderer 处理过程
    • 2.1. 处理消息
  • 3. 参考资料

创建通道的过程

Client Orderer 用户指定或从默认配置中生成创建通道的交易mychannel.tx, 该消息是Envelope结构的CONFIG_UPDATE类型消息 对mychannel.tx进行签名并封装 发送封装好的消息 接收消息 获取通道消息处理器,- 因为是创建通道,<br- >则调用默认的系统通- 道管理器 对消息进行重新封装, 头部消息类型为ORDERER_TRANSACTION, 内层消息类型为CONFIG,每次封装都会签名 返回处理结果 将配置交易消息构造成创世区块 并保存到新通道的数据文件中 获取区块号为0的创世区块 loop [Client每200毫秒请求一次] par [Orderer处理] [Client处理] 返回创世区块 保存创世区块 Client Orderer

1. Client 处理过程

internal/peer/channel/create.go

func executeCreate(cf *ChannelCmdFactory) error {// 发送创建通道的交易err := sendCreateChainTransaction(cf)if err != nil {return err}// 获取通道的创世区块block, err := getGenesisBlock(cf)if err != nil {return err}b, err := proto.Marshal(block)if err != nil {return err}file := channelID + ".block"if outputBlock != common.UndefinedParamValue {file = outputBlock}// 保存通道创世区块信息err = ioutil.WriteFile(file, b, 0o644)if err != nil {return err}return nil
}

1.1. 发送创建通道的交易

internal/peer/channel/create.go

func sendCreateChainTransaction(cf *ChannelCmdFactory) error {var err errorvar chCrtEnv *cb.Envelope// 如果传入创建通道的交易信息,则读取并解析if channelTxFile != "" {if chCrtEnv, err = createChannelFromConfigTx(channelTxFile); err != nil {return err}} else { // 如果没有传入,则根据默认的配置yaml进行创建if chCrtEnv, err = createChannelFromDefaults(cf); err != nil {return err}}// 对交易信息进行必要检查,通过后对消息进行签名if chCrtEnv, err = sanityCheckAndSignConfigTx(chCrtEnv, cf.Signer); err != nil {return err}var broadcastClient common.BroadcastClientbroadcastClient, err = cf.BroadcastFactory()if err != nil {return errors.WithMessage(err, "error getting broadcast client")}defer broadcastClient.Close()// 发送封装好的消息err = broadcastClient.Send(chCrtEnv)return err
}

1.2. 获取通道的创世区块

每隔 200 毫秒尝试从排序节点获取一次通道的创世区块,即区块号为 0 的区块。

internal/peer/channel/create.go

func getGenesisBlock(cf *ChannelCmdFactory) (*cb.Block, error) {timer := time.NewTimer(timeout)defer timer.Stop()for {select {case <-timer.C:cf.DeliverClient.Close()return nil, errors.New("timeout waiting for channel creation")default:// 获取创世区块if block, err := cf.DeliverClient.GetSpecifiedBlock(0); err != nil {cf.DeliverClient.Close()cf, err = InitCmdFactory(EndorserNotRequired, PeerDeliverNotRequired, OrdererRequired)if err != nil {return nil, errors.WithMessage(err, "failed connecting")}// 休眠 200 毫秒time.Sleep(200 * time.Millisecond)} else {cf.DeliverClient.Close()return block, nil}}}
}

2. Orderer 处理过程

orderer/common/broadcast/broadcast.go

// Handle reads requests from a Broadcast stream, processes them, and returns the responses to the stream
func (bh *Handler) Handle(srv ab.AtomicBroadcast_BroadcastServer) error {addr := util.ExtractRemoteAddress(srv.Context())logger.Debugf("Starting new broadcast loop for %s", addr)for {// 接收消息msg, err := srv.Recv()if err == io.EOF {logger.Debugf("Received EOF from %s, hangup", addr)return nil}if err != nil {logger.Warningf("Error reading from %s: %s", addr, err)return err}// 处理消息resp := bh.ProcessMessage(msg, addr)// 发送处理结果err = srv.Send(resp)if resp.Status != cb.Status_SUCCESS {return err}if err != nil {logger.Warningf("Error sending to %s: %s", addr, err)return err}}
}

2.1. 处理消息

orderer/common/broadcast/broadcast.go

// ProcessMessage validates and enqueues a single message
func (bh *Handler) ProcessMessage(msg *cb.Envelope, addr string) (resp *ab.BroadcastResponse) {tracker := &MetricsTracker{ChannelID: "unknown",TxType:    "unknown",Metrics:   bh.Metrics,}defer func() {// This looks a little unnecessary, but if done directly as// a defer, resp gets the (always nil) current state of resp// and not the return valuetracker.Record(resp)}()tracker.BeginValidate()// 获取通道消息处理器,如果是创建通道,则调用默认的系统通道管理器chdr, isConfig, processor, err := bh.SupportRegistrar.BroadcastChannelSupport(msg)if chdr != nil {tracker.ChannelID = chdr.ChannelIdtracker.TxType = cb.HeaderType(chdr.Type).String()}if err != nil {logger.Warningf("[channel: %s] Could not get message processor for serving %s: %s", tracker.ChannelID, addr, err)return &ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST, Info: err.Error()}}if !isConfig {logger.Debugf("[channel: %s] Broadcast is processing normal message from %s with txid '%s' of type %s", chdr.ChannelId, addr, chdr.TxId, cb.HeaderType_name[chdr.Type])configSeq, err := processor.ProcessNormalMsg(msg)if err != nil {logger.Warningf("[channel: %s] Rejecting broadcast of normal message from %s because of error: %s", chdr.ChannelId, addr, err)return &ab.BroadcastResponse{Status: ClassifyError(err), Info: err.Error()}}tracker.EndValidate()tracker.BeginEnqueue()if err = processor.WaitReady(); err != nil {logger.Warningf("[channel: %s] Rejecting broadcast of message from %s with SERVICE_UNAVAILABLE: rejected by Consenter: %s", chdr.ChannelId, addr, err)return &ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE, Info: err.Error()}}err = processor.Order(msg, configSeq)if err != nil {logger.Warningf("[channel: %s] Rejecting broadcast of normal message from %s with SERVICE_UNAVAILABLE: rejected by Order: %s", chdr.ChannelId, addr, err)return &ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE, Info: err.Error()}}} else { // 对于创建通道,isConfig 值为 true,执行以下逻辑logger.Debugf("[channel: %s] Broadcast is processing config update message from %s", chdr.ChannelId, addr)// 对消息进行重新封装,头部消息类型为ORDERER_TRANSACTION,内层消息类型为CONFIGconfig, configSeq, err := processor.ProcessConfigUpdateMsg(msg)if err != nil {logger.Warningf("[channel: %s] Rejecting broadcast of config message from %s because of error: %s", chdr.ChannelId, addr, err)return &ab.BroadcastResponse{Status: ClassifyError(err), Info: err.Error()}}tracker.EndValidate()tracker.BeginEnqueue()if err = processor.WaitReady(); err != nil {logger.Warningf("[channel: %s] Rejecting broadcast of message from %s with SERVICE_UNAVAILABLE: rejected by Consenter: %s", chdr.ChannelId, addr, err)return &ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE, Info: err.Error()}}// 将配置更新消息进行广播err = processor.Configure(config, configSeq)if err != nil {logger.Warningf("[channel: %s] Rejecting broadcast of config message from %s with SERVICE_UNAVAILABLE: rejected by Configure: %s", chdr.ChannelId, addr, err)return &ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE, Info: err.Error()}}}logger.Debugf("[channel: %s] Broadcast has successfully enqueued message of type %s from %s", chdr.ChannelId, cb.HeaderType_name[chdr.Type], addr)return &ab.BroadcastResponse{Status: cb.Status_SUCCESS}
}

3. 参考资料

  • Fabric创建通道流程解析

这篇关于fabric创建通道过程的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1118329

相关文章

Java 虚拟线程的创建与使用深度解析

《Java虚拟线程的创建与使用深度解析》虚拟线程是Java19中以预览特性形式引入,Java21起正式发布的轻量级线程,本文给大家介绍Java虚拟线程的创建与使用,感兴趣的朋友一起看看吧... 目录一、虚拟线程简介1.1 什么是虚拟线程?1.2 为什么需要虚拟线程?二、虚拟线程与平台线程对比代码对比示例:三

k8s按需创建PV和使用PVC详解

《k8s按需创建PV和使用PVC详解》Kubernetes中,PV和PVC用于管理持久存储,StorageClass实现动态PV分配,PVC声明存储需求并绑定PV,通过kubectl验证状态,注意回收... 目录1.按需创建 PV(使用 StorageClass)创建 StorageClass2.创建 PV

Redis中Hash从使用过程到原理说明

《Redis中Hash从使用过程到原理说明》RedisHash结构用于存储字段-值对,适合对象数据,支持HSET、HGET等命令,采用ziplist或hashtable编码,通过渐进式rehash优化... 目录一、开篇:Hash就像超市的货架二、Hash的基本使用1. 常用命令示例2. Java操作示例三

Linux创建服务使用systemctl管理详解

《Linux创建服务使用systemctl管理详解》文章指导在Linux中创建systemd服务,设置文件权限为所有者读写、其他只读,重新加载配置,启动服务并检查状态,确保服务正常运行,关键步骤包括权... 目录创建服务 /usr/lib/systemd/system/设置服务文件权限:所有者读写js,其他

Redis中Set结构使用过程与原理说明

《Redis中Set结构使用过程与原理说明》本文解析了RedisSet数据结构,涵盖其基本操作(如添加、查找)、集合运算(交并差)、底层实现(intset与hashtable自动切换机制)、典型应用场... 目录开篇:从购物车到Redis Set一、Redis Set的基本操作1.1 编程常用命令1.2 集

Linux下利用select实现串口数据读取过程

《Linux下利用select实现串口数据读取过程》文章介绍Linux中使用select、poll或epoll实现串口数据读取,通过I/O多路复用机制在数据到达时触发读取,避免持续轮询,示例代码展示设... 目录示例代码(使用select实现)代码解释总结在 linux 系统里,我们可以借助 select、

k8s中实现mysql主备过程详解

《k8s中实现mysql主备过程详解》文章讲解了在K8s中使用StatefulSet部署MySQL主备架构,包含NFS安装、storageClass配置、MySQL部署及同步检查步骤,确保主备数据一致... 目录一、k8s中实现mysql主备1.1 环境信息1.2 部署nfs-provisioner1.2.

idea+spring boot创建项目的搭建全过程

《idea+springboot创建项目的搭建全过程》SpringBoot是Spring社区发布的一个开源项目,旨在帮助开发者快速并且更简单的构建项目,:本文主要介绍idea+springb... 目录一.idea四种搭建方式1.Javaidea命名规范2JavaWebTomcat的安装一.明确tomcat

MyBatis/MyBatis-Plus同事务循环调用存储过程获取主键重复问题分析及解决

《MyBatis/MyBatis-Plus同事务循环调用存储过程获取主键重复问题分析及解决》MyBatis默认开启一级缓存,同一事务中循环调用查询方法时会重复使用缓存数据,导致获取的序列主键值均为1,... 目录问题原因解决办法如果是存储过程总结问题myBATis有如下代码获取序列作为主键IdMappe

linux部署NFS和autofs自动挂载实现过程

《linux部署NFS和autofs自动挂载实现过程》文章介绍了NFS(网络文件系统)和Autofs的原理与配置,NFS通过RPC实现跨系统文件共享,需配置/etc/exports和nfs.conf,... 目录(一)NFS1. 什么是NFS2.NFS守护进程3.RPC服务4. 原理5. 部署5.1安装NF