"BOKU"のITな日常

62歳・文系システムエンジニアの”BOKU”は日々勉強を楽しんでます

Pythonでマイブロックチェーンに改ざんチェックを追加実装する/ブロックチェーンの勉強二回目

ブロックチェーン技術の勉強に、マイブロックチェーンpythonで実装を試みている第二回目です。

f:id:arakan_no_boku:20200130223854p:plain

 

はじめに

 

ブロックチェーンについて勉強したことの整理を兼ねて、Pythonで実装してみます。

前回は「一回目:ブロックチェーンの骨格を作る」でした。

arakan-pgm-ai.hatenablog.com

今回は、「二回目:保存や改ざんチェックなどの機能を付け加えてみる」をやります。

一回目で作成した「MyBlockChain」クラスをベースに、機能追加をしていきます。

 

今回追加する機能の仕様について

 

一回目の実装は骨格だけです。

なので、クラス生成時に空の「chain」を初期化して、毎回一からブロックチェインを形成して、それを表示するだけの機能しかありませんでした。

でも。

やっぱり、ファイルへの保存と読み込み位は最低限できないと厳しいです。 

当然。

読み込む時にはブロックチェインの改ざんチェックを行い、問題のあるブロックは読み込まない工夫も必要です。

とはいえ。

別に暗号通貨みたいな仕組を作りたいわけではありません。

とりあえず、実装していくのは。

あたりですかね。

 

構築したブロックチェーントランザクションとハッシュをJSONで保存する。 

 

該当部分のソースはこんな感じにしました。

 def save(self, outfile="chain00.json"):
        save_arr = []
        for i, x in enumerate(self.chain):
            if i > 0:
                save_dic = {}
                save_dic['id'] = x['block_index']
                save_dic['hash'] = x['block_header']['tran_hash']
                save_dic['input'] = x['tran_body']['input']
                save_dic['output'] = x['tran_body']['output']
                save_arr.append(save_dic)
        fw = open(outfile, 'w')
        json.dump(save_arr, fw, indent=2)

単純に構築したブロックチェーンをそのままJSONに出力してません。

MyBlockChainで構築するブロックの構造はこんな感じです。

block = {
    'block_index': ブロックID,
    'block_time': ブロック作成時刻,
    'block_header': {
        'prev_hash': 前のブロックから引き継いだハッシュ値,
        'tran_hash': このブロックのトランザクションから計算したハッシュ値,
    },
    'tran_counter': トランザクション(tran_body)に含まれるデータ数,
    'tran_body': トランザクションデータ(入力・出力),
}

なので、JSONファイルに出力しているのは、その一部(トランザクションデータとハッシュ値とID)だけです。 

かつ。

先頭ブロックは出力しない(if i>0)ようにしてます。

こうしている理由は読み込み時のチェックのところで整理します。

 

保存したJSONファイルから読み込んでブロックチェーンを展開する。

 

前の処理で保存したJSONファイルを読み込んで、マイブロックチェーンを再構築する処理です。

該当部分のソースはこんな感じです。

    def __init__(self, loadfile="chain00.json"):
        self.chain = []
        self.testkey = "a6cfb4c140cf8cf2b281d5538961cdcfaca30a464befc02bf5af3bc7397cd4d7"
        # 最初のブロックだけは固定で作る
        self.add_new_block(0, {'747bc42088cf0b3915982af289189e8f': 0}, {
                           '14d3325a7d594bc2d30a7014a536cb13': 0}, self.testkey)
        try:
            with open(loadfile, "r") as fr:
                loadblocks = json.load(fr)
                for bck in loadblocks:
                    isOK = self.add_new_block(
                        bck['id'], bck['input'], bck['output'], bck['hash'])
                    if not isOK:
                        print("ブロック構築を異常終了します。")
                        break
        except BaseException:
            pass

補足します。

第一ブロックはファイルに出力せず、固定で生成して、ファイルから読み込んだブロックは2ブロック目以降に順次読み込むようにしています。

これはファイルの改ざん防止+間違ったファイルを読み込むミスの防止策です。

第一ブロックのハッシュはプログラム内で生成され、ファイルから読み込んだトランザクションのハッシュと組合せて、ハッシュを再計算します。

それがファイルから読み込んだハッシュ値と一致しなければ「間違ったファイル」または「改ざんされたファイル」と判断する・・というわけです。

 

ファイルから読み込む時に改ざんチェックし、改ざんを発見したら処理を止める

 

前段で「ファイルから読み込む最初のブロック」の判定をしてます。

こちらは、その判定を潜り抜けた後のブロックの改ざんチェックを行う部分です。

該当部分のソースです。

    def add_new_block(self, id, inp, outp, check_hash):
        # トランザクションを生成する
        new_transaction = self.__create_new_transaction(inp, outp)

        # 前のブロックのハッシュを取得。最初だけ固定値
        if len(self.chain) > 0:
            prev_hash = self.chain[-1]['block_header']['tran_hash']
        else:
            prev_hash = "747bc42088cf0b3915982af289189e8f14d3325a7d594bc2d30a7014a536cb13"

        check_ok = False
        # このブロックのハッシュ値を計算する
        tran_hash = self.__hash(
            prev_hash + self.__calc_tran_hash(new_transaction))
        # テストデータを作るための認証キーをとりあえず固定にしておく
        if check_hash == self.testkey:
            check_ok = True
        else:
            if check_hash == tran_hash:
                check_ok = True

        if check_ok:
            # トランザクションを元にブロックを生成して、チェーンに接続する
            new_block = {
                'block_index': len(self.chain) + 1,
                'block_time': dt.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
                'block_header': {
                    'prev_hash': prev_hash,
                    'tran_hash': tran_hash,
                },
                'tran_counter': len(inp) + len(outp),
                'tran_body': new_transaction,
            }
            self.chain.append(new_block)
        else:
            print("要注意「", id, "」のトランザクションで改ざんの疑いがあります。!")
            return False

        return True

やってることはたいしたことありません。 

JSONファイルのトランザクションハッシュを引数でうけとって、ブロック追加時に計算したハッシュと比較して、一致しないとエラーにしているだけです。

 

改造版MyBlockChain

 

前回のMyBlockChainに保存・読み込み・改ざんチェックを付け加えました。

暗号通貨とかで利用されているブロックチェーンのような「ブロック追加時のマイニング手順」を実装しない、シンプルなブロックチェーンとなってます。

ソース全文です。

import datetime as dt
import json
import hashlib


class MyBlockChain(object):
    # ブロックチェーンを初期化する
    def __init__(self, loadfile="chain00.json"):
        self.chain = []
        # テストデータ作成用キー(固定)
        self.testkey = "a6cfb4c140cf8cf2b281d5538961cdcfaca30a464befc02bf5af3bc7397cd4d7"
        # 最初のブロックだけは固定で作る
        self.add_new_block(0, {'747bc42088cf0b3915982af289189e8f': 0}, {
                           '14d3325a7d594bc2d30a7014a536cb13': 0}, self.testkey)
        # JSONファイルからブロックデータを読み込んで、チェインに追加する
        try:
            with open(loadfile, "r") as fr:
                loadblocks = json.load(fr)
                for bck in loadblocks:
                    isOK = self.add_new_block(
                        bck['id'], bck['input'], bck['output'], bck['hash'])
                    if not isOK:
                        print("ブロック構築を異常終了します。")
                        break
        except BaseException:
            pass

    # 新しいブロックを作成する
    def add_new_block(self, id, inp, outp, check_hash):
        # トランザクションを生成する
        new_transaction = self.__create_new_transaction(inp, outp)

        # 前のブロックのハッシュを取得。最初だけ固定値
        if len(self.chain) > 0:
            prev_hash = self.chain[-1]['block_header']['tran_hash']
        else:
            prev_hash = "747bc42088cf0b3915982af289189e8f14d3325a7d594bc2d30a7014a536cb13"

        check_ok = False
        # このブロックのハッシュ値を計算する
        tran_hash = self.__hash(
            prev_hash + self.__calc_tran_hash(new_transaction))
        # テストデータを作るためのとりあえず処理
        if check_hash == self.testkey:
            check_ok = True
        else:
            # JSONファイルから読み込んだブロックの検査
            if check_hash == tran_hash:
                check_ok = True

        if check_ok:
            # トランザクションを元にブロックを生成して、チェーンに接続する
            new_block = {
                'block_index': len(self.chain) + 1,
                'block_time': dt.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
                'block_header': {
                    'prev_hash': prev_hash,
                    'tran_hash': tran_hash,
                },
                'tran_counter': len(inp) + len(outp),
                'tran_body': new_transaction,
            }
            self.chain.append(new_block)
        else:
            print("要注意「", id, "」のトランザクションで改ざんの疑いがあります。!")
            return False

        return True

    # 新しいトランザクションを生成する
    def __create_new_transaction(self, inp, outp):
        new_transaction = {
            'input': inp,
            'output': outp,
        }
        return new_transaction

    # ハッシュ値を計算する。SortをTrueにしているのはハッシュの整合性維持のため
    def __calc_tran_hash(self, new_transaction):
        tran_string = json.dumps(new_transaction, sort_keys=True).encode()
        return self.__hash(tran_string)

    # ハッシュ計算内部関数
    def __hash(self, str_seed):
        return hashlib.sha256(str(str_seed).encode()).hexdigest()

    # JSONファイルにブロックチェインの主要要素のみを保存する
    def save(self, outfile="chain00.json"):
        save_arr = []
        for i, x in enumerate(self.chain):
            if i > 0:
                save_dic = {}
                save_dic['id'] = x['block_index']
                save_dic['hash'] = x['block_header']['tran_hash']
                save_dic['input'] = x['tran_body']['input']
                save_dic['output'] = x['tran_body']['output']
                save_arr.append(save_dic)
        fw = open(outfile, 'w')
        json.dump(save_arr, fw, indent=2)

    # ブロックの内容を表示する
    def dump(self, block_index=0):
        if(block_index == 0):
            print(json.dumps(self.chain, sort_keys=False, indent=2))
        else:
            print(
                json.dumps(
                    self.chain[block_index],
                    sort_keys=False,
                    indent=2))

    # テスト用のブロックを強制的に作るためのテストキーを返す
    def get_testkey(self):
        return str(self.testkey)

前回のクラスに上記で説明した部分を加えただけなので、補足説明はありません。

 

実行してみます

 

テスト用に以下のJSONファイルを用意します。

[
  {
    "id": 2,
    "hash": "f31daf4fa73faa5d890cbc0d8ad285ec6dcddf64c0236fc726124e922552126b",
    "input": {
      "Y3XNSHTT": 12345
    },
    "output": {
      "X2VBASDD": 10000,
      "Z45UHRKL": 2345
    }
  },
  {
    "id": 3,
    "hash": "7eca4ab3c0dd2b825d7c7568f324f3f7a7b4d883e6c9e85d5028bfcb825b470c",
    "input": {
      "X2VBASDD": 22345
    },
    "output": {
      "Y3XNSHTT": 10000,
      "Z45UHRKL": 12345
    }
  },
  {
    "id": 4,
    "hash": "e582060c816f5930fc9874f7a8f18eeb25ccf053d40a84d2a2bb088c64cc9f1a",
    "input": {
      "Z45UHRKL": 23450
    },
    "output": {
      "Y3XNSHTT": 13000,
      "X2VBASDD": 10450
    }
  }
]

これを「chain00.json」のファイル名でPythonスクリプトと同じフォルダにおきます。 

実行するソースは2行だけ。

bc = MyBlockChain()
bc.dump()

 これを実行すると、以下のような結果を表示します。

[
  {
    "block_index": 1,
    "block_time": "2020-02-08 18:14:06",
    "block_header": {
      "prev_hash": "747bc42088cf0b3915982af289189e8f14d3325a7d594bc2d30a7014a536cb13",
      "tran_hash": "ec3400824e188b13a5ca7821d180fe4084e120fd02a6d0e14e6848ae070b29c8"
    },
    "tran_counter": 2,
    "tran_body": {
      "input": {
        "747bc42088cf0b3915982af289189e8f": 0
      },
      "output": {
        "14d3325a7d594bc2d30a7014a536cb13": 0
      }
    }
  },
  {
    "block_index": 2,
    "block_time": "2020-02-08 18:14:06",
    "block_header": {
      "prev_hash": "ec3400824e188b13a5ca7821d180fe4084e120fd02a6d0e14e6848ae070b29c8",
      "tran_hash": "f31daf4fa73faa5d890cbc0d8ad285ec6dcddf64c0236fc726124e922552126b"
    },
    "tran_counter": 3,
    "tran_body": {
      "input": {
        "Y3XNSHTT": 12345
      },
      "output": {
        "X2VBASDD": 10000,
        "Z45UHRKL": 2345
      }
    }
  },
  {
    "block_index": 3,
    "block_time": "2020-02-08 18:14:06",
    "block_header": {
      "prev_hash": "f31daf4fa73faa5d890cbc0d8ad285ec6dcddf64c0236fc726124e922552126b",
      "tran_hash": "7eca4ab3c0dd2b825d7c7568f324f3f7a7b4d883e6c9e85d5028bfcb825b470c"
    },
    "tran_counter": 3,
    "tran_body": {
      "input": {
        "X2VBASDD": 22345
      },
      "output": {
        "Y3XNSHTT": 10000,
        "Z45UHRKL": 12345
      }
    }
  },
  {
    "block_index": 4,
    "block_time": "2020-02-08 18:14:06",
    "block_header": {
      "prev_hash": "7eca4ab3c0dd2b825d7c7568f324f3f7a7b4d883e6c9e85d5028bfcb825b470c",
      "tran_hash": "e582060c816f5930fc9874f7a8f18eeb25ccf053d40a84d2a2bb088c64cc9f1a"
    },
    "tran_counter": 3,
    "tran_body": {
      "input": {
        "Z45UHRKL": 23450
      },
      "output": {
        "Y3XNSHTT": 13000,
        "X2VBASDD": 10450
      }
    }
  }
]

ちゃんとJSONファイルを読み込んでます。 

 

改ざんチェックをやってみる

 

今度はJSONファイルの一部を改ざんしてみます。

ブロックの最後の

"X2VBASDD": 10450

であるところを

"X2VBASDD": 10451

と数字を1だけ変更してみます。

これで実行すると・・。

要注意「 4 」のトランザクションで改ざんの疑いがあります。!
ブロック構築を異常終了します。

おお・・、ちゃんと検知しました。

OKじゃないでしょうか。

今回はこんなところで。

ではでは。