• R/O
  • HTTP
  • SSH
  • HTTPS

vapor: 提交

Golang implemented sidechain for Bytom


Commit MetaInfo

修订版d01a1668d0ba642accd74d1bda7a4e280c3e202e (tree)
时间2019-11-21 16:56:46
作者paladz <453256728@qq.c...>
Commiterpaladz

Log Message

set proposal have timeout

更改概述

差异

--- a/application/mov/mov_core.go
+++ b/application/mov/mov_core.go
@@ -76,7 +76,7 @@ func (m *MovCore) ApplyBlock(block *types.Block) error {
7676 become an infinite loop and DDoS attacks the whole network?
7777 */
7878 // BeforeProposalBlock return all transactions than can be matched, and the number of transactions cannot exceed the given capacity.
79-func (m *MovCore) BeforeProposalBlock(txs []*types.Tx, nodeProgram []byte, blockHeight uint64, gasLeft int64) ([]*types.Tx, int64, error) {
79+func (m *MovCore) BeforeProposalBlock(txs []*types.Tx, nodeProgram []byte, blockHeight uint64, gasLeft int64, isTimeout func() bool) ([]*types.Tx, int64, error) {
8080 if blockHeight <= m.startBlockHeight {
8181 return nil, 0, nil
8282 }
@@ -91,7 +91,7 @@ func (m *MovCore) BeforeProposalBlock(txs []*types.Tx, nodeProgram []byte, block
9191 tradePairIterator := database.NewTradePairIterator(m.movStore)
9292
9393 var packagedTxs []*types.Tx
94- for gasLeft > 0 && tradePairIterator.HasNext() {
94+ for gasLeft > 0 && !isTimeout() && tradePairIterator.HasNext() {
9595 tradePair := tradePairIterator.Next()
9696 if tradePairMap[tradePair.Key()] {
9797 continue
@@ -99,7 +99,7 @@ func (m *MovCore) BeforeProposalBlock(txs []*types.Tx, nodeProgram []byte, block
9999 tradePairMap[tradePair.Key()] = true
100100 tradePairMap[tradePair.Reverse().Key()] = true
101101
102- for gasLeft > 0 && matchEngine.HasMatchedTx(tradePair, tradePair.Reverse()) {
102+ for gasLeft > 0 && !isTimeout() && matchEngine.HasMatchedTx(tradePair, tradePair.Reverse()) {
103103 matchedTx, err := matchEngine.NextMatchedTx(tradePair, tradePair.Reverse())
104104 if err != nil {
105105 return nil, 0, err
--- a/proposal/blockproposer/blockproposer.go
+++ b/proposal/blockproposer/blockproposer.go
@@ -16,7 +16,8 @@ import (
1616 )
1717
1818 const (
19- logModule = "blockproposer"
19+ logModule = "blockproposer"
20+ timeProportionDenominator = 3
2021 )
2122
2223 // BlockProposer propose several block in specified time range
@@ -74,7 +75,8 @@ func (b *BlockProposer) generateBlocks() {
7475 continue
7576 }
7677
77- block, err := proposal.NewBlockTemplate(b.chain, b.accountManager, nextBlockTime)
78+ timeoutDuration := time.Duration(consensus.ActiveNetParams.BlockTimeInterval/timeProportionDenominator) * time.Millisecond
79+ block, err := proposal.NewBlockTemplate(b.chain, b.accountManager, nextBlockTime, timeoutDuration)
7880 if err != nil {
7981 log.WithFields(log.Fields{"module": logModule, "error": err}).Error("failed on create NewBlockTemplate")
8082 continue
--- a/proposal/proposal.go
+++ b/proposal/proposal.go
@@ -20,25 +20,176 @@ import (
2020 )
2121
2222 const (
23- logModule = "mining"
23+ logModule = "mining"
24+ batchApplyNum = 64
2425 )
2526
27+type blockBuilder struct {
28+ chain *protocol.Chain
29+ accountManager *account.Manager
30+
31+ block *types.Block
32+ txStatus *bc.TransactionStatus
33+ utxoView *state.UtxoViewpoint
34+
35+ timeoutCh <-chan time.Time
36+ gasLeft int64
37+ timeoutFlag bool
38+}
39+
40+func newBlockBuilder(chain *protocol.Chain, accountManager *account.Manager, timestamp uint64, timeoutDuration time.Duration) *blockBuilder {
41+ preBlockHeader := chain.BestBlockHeader()
42+ block := &types.Block{
43+ BlockHeader: types.BlockHeader{
44+ Version: 1,
45+ Height: preBlockHeader.Height + 1,
46+ PreviousBlockHash: preBlockHeader.Hash(),
47+ Timestamp: timestamp,
48+ BlockCommitment: types.BlockCommitment{},
49+ BlockWitness: types.BlockWitness{Witness: make([][]byte, consensus.ActiveNetParams.NumOfConsensusNode)},
50+ },
51+ }
52+
53+ builder := &blockBuilder{
54+ chain: chain,
55+ accountManager: accountManager,
56+ block: block,
57+ txStatus: bc.NewTransactionStatus(),
58+ utxoView: state.NewUtxoViewpoint(),
59+ timeoutCh: time.After(timeoutDuration),
60+ gasLeft: int64(consensus.ActiveNetParams.MaxBlockGas),
61+ }
62+ return builder
63+}
64+
65+func (b *blockBuilder) applyCoinbaseTransaction() error {
66+ coinbaseTx, err := b.createCoinbaseTx()
67+ if err != nil {
68+ return errors.Wrap(err, "fail on create coinbase tx")
69+ }
70+
71+ gasState, err := validation.ValidateTx(coinbaseTx.Tx, &bc.Block{BlockHeader: &bc.BlockHeader{Height: b.block.Height}, Transactions: []*bc.Tx{coinbaseTx.Tx}})
72+ if err != nil {
73+ return err
74+ }
75+
76+ b.block.Transactions = append(b.block.Transactions, coinbaseTx)
77+ if err := b.txStatus.SetStatus(0, false); err != nil {
78+ return err
79+ }
80+
81+ b.gasLeft -= gasState.GasUsed
82+ return nil
83+}
84+func (b *blockBuilder) applyTransactions(txs []*types.Tx) error {
85+ tempTxs := []*types.Tx{}
86+ for i := 0; i < len(txs); i++ {
87+ if tempTxs = append(tempTxs, txs[i]); len(tempTxs) < batchApplyNum && i != len(txs)-1 {
88+ continue
89+ }
90+
91+ results, gasLeft := preValidateTxs(tempTxs, b.chain, b.utxoView, b.gasLeft)
92+ for _, result := range results {
93+ if result.err != nil && !result.gasOnly {
94+ blkGenSkipTxForErr(b.chain.GetTxPool(), &result.tx.ID, result.err)
95+ continue
96+ }
97+
98+ if err := b.txStatus.SetStatus(len(b.block.Transactions), result.gasOnly); err != nil {
99+ return err
100+ }
101+
102+ b.block.Transactions = append(b.block.Transactions, result.tx)
103+ }
104+
105+ b.gasLeft = gasLeft
106+ tempTxs = []*types.Tx{}
107+ if b.isTimeout() {
108+ break
109+ }
110+ }
111+ return nil
112+}
113+
114+func (b *blockBuilder) applyTransactionFromPool() error {
115+ txDescList := b.chain.GetTxPool().GetTransactions()
116+ sort.Sort(byTime(txDescList))
117+
118+ poolTxs := make([]*types.Tx, len(txDescList))
119+ for i, txDesc := range txDescList {
120+ poolTxs[i] = txDesc.Tx
121+ }
122+
123+ return b.applyTransactions(poolTxs)
124+}
125+
126+func (b *blockBuilder) applyTransactionFromSubProtocol() error {
127+ cp, err := b.accountManager.GetCoinbaseControlProgram()
128+ if err != nil {
129+ return err
130+ }
131+
132+ for i, p := range b.chain.SubProtocols() {
133+ if b.gasLeft <= 0 || b.isTimeout() {
134+ break
135+ }
136+
137+ subTxs, _, err := p.BeforeProposalBlock(b.block.Transactions, cp, b.block.Height, b.gasLeft, b.isTimeout)
138+ if err != nil {
139+ log.WithFields(log.Fields{"module": logModule, "index": i, "error": err}).Error("failed on sub protocol txs package")
140+ continue
141+ }
142+
143+ if err := b.applyTransactions(subTxs); err != nil {
144+ return err
145+ }
146+ }
147+ return nil
148+}
149+
150+func (b *blockBuilder) calcBlockCommitment() (err error) {
151+ var txEntries []*bc.Tx
152+ for _, tx := range b.block.Transactions {
153+ txEntries = append(txEntries, tx.Tx)
154+ }
155+
156+ b.block.BlockHeader.BlockCommitment.TransactionsMerkleRoot, err = types.TxMerkleRoot(txEntries)
157+ if err != nil {
158+ return err
159+ }
160+
161+ b.block.BlockHeader.BlockCommitment.TransactionStatusHash, err = types.TxStatusMerkleRoot(b.txStatus.VerifyStatus)
162+ return err
163+}
164+
26165 // createCoinbaseTx returns a coinbase transaction paying an appropriate subsidy
27166 // based on the passed block height to the provided address. When the address
28167 // is nil, the coinbase transaction will instead be redeemable by anyone.
29-func createCoinbaseTx(accountManager *account.Manager, chain *protocol.Chain, preBlockHeader *types.BlockHeader) (tx *types.Tx, err error) {
30- preBlockHash := preBlockHeader.Hash()
31- consensusResult, err := chain.GetConsensusResultByHash(&preBlockHash)
168+func (b *blockBuilder) createCoinbaseTx() (*types.Tx, error) {
169+ consensusResult, err := b.chain.GetConsensusResultByHash(&b.block.PreviousBlockHash)
32170 if err != nil {
33171 return nil, err
34172 }
35173
36- rewards, err := consensusResult.GetCoinbaseRewards(preBlockHeader.Height)
174+ rewards, err := consensusResult.GetCoinbaseRewards(b.block.Height - 1)
37175 if err != nil {
38176 return nil, err
39177 }
40178
41- return createCoinbaseTxByReward(accountManager, preBlockHeader.Height + 1, rewards)
179+ return createCoinbaseTxByReward(b.accountManager, b.block.Height, rewards)
180+}
181+
182+func (b *blockBuilder) isTimeout() bool {
183+ if b.timeoutFlag {
184+ return true
185+ }
186+
187+ select {
188+ case <-b.timeoutCh:
189+ b.timeoutFlag = true
190+ default:
191+ }
192+ return b.timeoutFlag
42193 }
43194
44195 func createCoinbaseTxByReward(accountManager *account.Manager, blockHeight uint64, rewards []state.CoinbaseReward) (tx *types.Tx, err error) {
@@ -91,113 +242,26 @@ func createCoinbaseTxByReward(accountManager *account.Manager, blockHeight uint6
91242 }
92243
93244 // NewBlockTemplate returns a new block template that is ready to be solved
94-func NewBlockTemplate(chain *protocol.Chain, accountManager *account.Manager, timestamp uint64) (*types.Block, error) {
95- block := createBasicBlock(chain, timestamp)
96-
97- view := state.NewUtxoViewpoint()
98- txStatus := bc.NewTransactionStatus()
99-
100- gasLeft, err := applyCoinbaseTransaction(chain, block, txStatus, accountManager, int64(consensus.ActiveNetParams.MaxBlockGas))
101- if err != nil {
245+func NewBlockTemplate(chain *protocol.Chain, accountManager *account.Manager, timestamp uint64, timeoutDuration time.Duration) (*types.Block, error) {
246+ builder := newBlockBuilder(chain, accountManager, timestamp, timeoutDuration)
247+ if err := builder.applyCoinbaseTransaction(); err != nil {
102248 return nil, err
103249 }
104250
105- gasLeft, err = applyTransactionFromPool(chain, view, block, txStatus, gasLeft)
106- if err != nil {
107- return nil, err
108- }
109-
110- if err := applyTransactionFromSubProtocol(chain, view, block, txStatus, accountManager, gasLeft); err != nil {
251+ if err := builder.applyTransactionFromPool(); err != nil {
111252 return nil, err
112253 }
113254
114- var txEntries []*bc.Tx
115- for _, tx := range block.Transactions {
116- txEntries = append(txEntries, tx.Tx)
117- }
118-
119- block.BlockHeader.BlockCommitment.TransactionsMerkleRoot, err = types.TxMerkleRoot(txEntries)
120- if err != nil {
255+ if err := builder.applyTransactionFromSubProtocol(); err != nil {
121256 return nil, err
122257 }
123258
124- block.BlockHeader.BlockCommitment.TransactionStatusHash, err = types.TxStatusMerkleRoot(txStatus.VerifyStatus)
125-
126- _, err = chain.SignBlock(block)
127- return block, err
128-}
129-
130-func createBasicBlock(chain *protocol.Chain, timestamp uint64) *types.Block {
131- preBlockHeader := chain.BestBlockHeader()
132- return &types.Block{
133- BlockHeader: types.BlockHeader{
134- Version: 1,
135- Height: preBlockHeader.Height + 1,
136- PreviousBlockHash: preBlockHeader.Hash(),
137- Timestamp: timestamp,
138- BlockCommitment: types.BlockCommitment{},
139- BlockWitness: types.BlockWitness{Witness: make([][]byte, consensus.ActiveNetParams.NumOfConsensusNode)},
140- },
141- }
142-}
143-
144-func applyCoinbaseTransaction(chain *protocol.Chain, block *types.Block, txStatus *bc.TransactionStatus, accountManager *account.Manager, gasLeft int64) (int64, error) {
145- coinbaseTx, err := createCoinbaseTx(accountManager, chain, chain.BestBlockHeader())
146- if err != nil {
147- return 0, errors.Wrap(err, "fail on create coinbase tx")
148- }
149-
150- gasState, err := validation.ValidateTx(coinbaseTx.Tx, &bc.Block{BlockHeader: &bc.BlockHeader{Height: chain.BestBlockHeight() + 1}, Transactions: []*bc.Tx{coinbaseTx.Tx}})
151- if err != nil {
152- return 0, err
153- }
154-
155- block.Transactions = append(block.Transactions, coinbaseTx)
156- if err := txStatus.SetStatus(0, false); err != nil {
157- return 0, err
158- }
159-
160- return gasLeft - gasState.GasUsed, nil
161-}
162-
163-
164-func applyTransactionFromPool(chain *protocol.Chain, view *state.UtxoViewpoint, block *types.Block, txStatus *bc.TransactionStatus, gasLeft int64) (int64, error) {
165- poolTxs := getAllTxsFromPool(chain.GetTxPool())
166- results, gasLeft := preValidateTxs(poolTxs, chain, view, gasLeft)
167- for _, result := range results {
168- if result.err != nil && !result.gasOnly {
169- blkGenSkipTxForErr(chain.GetTxPool(), &result.tx.ID, result.err)
170- continue
171- }
172-
173- if err := txStatus.SetStatus(len(block.Transactions), result.gasOnly); err != nil {
174- return 0, err
175- }
176-
177- block.Transactions = append(block.Transactions, result.tx)
178- }
179- return gasLeft, nil
180-}
181-
182-func applyTransactionFromSubProtocol(chain *protocol.Chain, view *state.UtxoViewpoint, block *types.Block, txStatus *bc.TransactionStatus, accountManager *account.Manager, gasLeft int64) error {
183- txs, err := getTxsFromSubProtocols(chain, accountManager, block.Transactions, gasLeft)
184- if err != nil {
185- return err
259+ if err := builder.calcBlockCommitment(); err != nil {
260+ return nil, err
186261 }
187262
188- results, gasLeft := preValidateTxs(txs, chain, view, gasLeft)
189- for _, result := range results {
190- if result.err != nil {
191- return err
192- }
193-
194- if err := txStatus.SetStatus(len(block.Transactions), result.gasOnly); err != nil {
195- return err
196- }
197-
198- block.Transactions = append(block.Transactions, result.tx)
199- }
200- return nil
263+ _, err := builder.chain.SignBlock(builder.block)
264+ return builder.block, err
201265 }
202266
203267 type validateTxResult struct {
@@ -262,41 +326,6 @@ func validateBySubProtocols(tx *types.Tx, statusFail bool, subProtocols []protoc
262326 return nil
263327 }
264328
265-func getAllTxsFromPool(txPool *protocol.TxPool) []*types.Tx {
266- txDescList := txPool.GetTransactions()
267- sort.Sort(byTime(txDescList))
268-
269- poolTxs := make([]*types.Tx, len(txDescList))
270- for i, txDesc := range txDescList {
271- poolTxs[i] = txDesc.Tx
272- }
273- return poolTxs
274-}
275-
276-func getTxsFromSubProtocols(chain *protocol.Chain, accountManager *account.Manager, poolTxs []*types.Tx, gasLeft int64) ([]*types.Tx, error) {
277- cp, err := accountManager.GetCoinbaseControlProgram()
278- if err != nil {
279- return nil, err
280- }
281-
282- var result []*types.Tx
283- var subTxs []*types.Tx
284- for i, p := range chain.SubProtocols() {
285- if gasLeft <= 0 {
286- break
287- }
288-
289- subTxs, gasLeft, err = p.BeforeProposalBlock(poolTxs, cp, chain.BestBlockHeight() + 1, gasLeft)
290- if err != nil {
291- log.WithFields(log.Fields{"module": logModule, "index": i, "error": err}).Error("failed on sub protocol txs package")
292- continue
293- }
294-
295- result = append(result, subTxs...)
296- }
297- return result, nil
298-}
299-
300329 func blkGenSkipTxForErr(txPool *protocol.TxPool, txHash *bc.Hash, err error) {
301330 log.WithFields(log.Fields{"module": logModule, "error": err}).Error("mining block generation: skip tx due to")
302331 txPool.RemoveTransaction(txHash)
--- a/protocol/protocol.go
+++ b/protocol/protocol.go
@@ -22,7 +22,7 @@ const (
2222 type Protocoler interface {
2323 Name() string
2424 StartHeight() uint64
25- BeforeProposalBlock(txs []*types.Tx, nodeProgram []byte, blockHeight uint64, gasLeft int64) ([]*types.Tx, int64, error)
25+ BeforeProposalBlock(txs []*types.Tx, nodeProgram []byte, blockHeight uint64, gasLeft int64, isTimeout func() bool) ([]*types.Tx, int64, error)
2626 ChainStatus() (uint64, *bc.Hash, error)
2727 ValidateBlock(block *types.Block, verifyResults []*bc.TxVerifyResult) error
2828 ValidateTxs(txs []*types.Tx, verifyResults []*bc.TxVerifyResult) error
@@ -238,7 +238,7 @@ func (c *Chain) syncProtocolStatus(subProtocol Protocoler) error {
238238 return errors.Wrap(err, subProtocol.Name(), "sub protocol detach block err")
239239 }
240240
241- protocolHeight, protocolHash = block.Height -1, &block.PreviousBlockHash
241+ protocolHeight, protocolHash = block.Height-1, &block.PreviousBlockHash
242242 }
243243
244244 for height := protocolHeight + 1; height <= c.BestBlockHeight(); height++ {
--- a/protocol/validation/tx.go
+++ b/protocol/validation/tx.go
@@ -3,6 +3,7 @@ package validation
33 import (
44 "fmt"
55 "math"
6+ "runtime"
67 "sync"
78
89 "github.com/vapor/common"
@@ -14,10 +15,6 @@ import (
1415 "github.com/vapor/protocol/vm"
1516 )
1617
17-const (
18- validateWorkerNum = 32
19-)
20-
2118 // validate transaction error
2219 var (
2320 ErrTxVersion = errors.New("invalid transaction version")
@@ -665,6 +662,7 @@ func validateTxWorker(workCh chan *validateTxWork, resultCh chan *ValidateTxResu
665662 // ValidateTxs validates txs in async mode
666663 func ValidateTxs(txs []*bc.Tx, block *bc.Block) []*ValidateTxResult {
667664 txSize := len(txs)
665+ validateWorkerNum := runtime.NumCPU()
668666 //init the goroutine validate worker
669667 var wg sync.WaitGroup
670668 workCh := make(chan *validateTxWork, txSize)
--- a/test/bench_blockchain_test.go
+++ b/test/bench_blockchain_test.go
@@ -159,7 +159,7 @@ func InsertChain(chain *protocol.Chain, txPool *protocol.TxPool, txs []*types.Tx
159159 }
160160 }
161161
162- block, err := proposal.NewBlockTemplate(chain, nil, uint64(time.Now().UnixNano()/1e6))
162+ block, err := proposal.NewBlockTemplate(chain, nil, uint64(time.Now().UnixNano()/1e6), time.Minute)
163163 if err != nil {
164164 return err
165165 }
--- a/test/performance/mining_test.go
+++ b/test/performance/mining_test.go
@@ -26,6 +26,6 @@ func BenchmarkNewBlockTpl(b *testing.B) {
2626
2727 b.ResetTimer()
2828 for i := 0; i < b.N; i++ {
29- proposal.NewBlockTemplate(chain, accountManager, uint64(time.Now().UnixNano()/1e6))
29+ proposal.NewBlockTemplate(chain, accountManager, uint64(time.Now().UnixNano()/1e6), time.Minute)
3030 }
3131 }
Show on old repository browser