#!/usr/bin/env python3 import argparse import plyvel import iavltree import json def decode_protobuf(subformats: dict, format_prefix: str, data: bytes): result = [] for (k,v) in iavltree.parse_pb(data): idx = f'{format_prefix}.{k}' if idx in subformats: f = subformats[idx] if f == 'str': decoded_value = v.decode('utf-8') elif f == 'int': decoded_value = int(v) elif f == 'float': decoded_value = float(v) elif f == 'proto': decoded_value = decode_protobuf(subformats, idx, v) elif f == 'protodict': decoded_value = dict(decode_protobuf(subformats, idx, v)) else: decoded_value = v else: decoded_value = v result.append((k, decoded_value)) return result def decode_output(format: str, data: bytes) -> str: if format == 'str': return data.decode('utf-8') elif format == 'int': return int(data) elif format == 'float': return float(data) elif format.startswith('protodict'): subformats = {'.' + id: subformat for x in format.split(',')[1:] for id, subformat in (x.split('='),)} return dict(decode_protobuf(subformats, '', data)) elif format.startswith('proto'): subformats = {'.' + id: subformat for x in format.split(',')[1:] for id, subformat in (x.split('='),)} return decode_protobuf(subformats, '', data) else: return data def get_args(): parser = argparse.ArgumentParser(description="Read the IAVL tree in a cosmos snapshot") parser.add_argument('-d', '--database', help='Path to database (application.db folder)') parser.add_argument('-H', '--height', type=int, help='Block height') parser.add_argument('-k', '--keyformat', help='Key format for maps (e.g. Qss)') parser.add_argument('-v', '--valueformat', help='Value format') subparsers = parser.add_subparsers(required=True, dest='cmd') p_max_height = subparsers.add_parser('max_height', help = 'Get the max block height in the snapshot') p_get = subparsers.add_parser('get', help = 'Retrieve a single item') p_get.add_argument('prefix', help = 'Prefix (e.g. "s/k:emissions/")') p_get.add_argument('key', nargs='+', help = 'Key parts') p_count = subparsers.add_parser('count', help = 'Count number of items with a prefix') p_count.add_argument('prefix', help = 'Prefix (e.g. "s/k:emissions/")') p_count.add_argument('key', nargs='*', help = 'Key parts') p_iterate = subparsers.add_parser('iterate', help = 'Iterate over items with some prefix') p_iterate.add_argument('prefix', help = 'Prefix (e.g. "s/k:emissions/")') p_iterate.add_argument('key', nargs='*', help = 'Key parts') p_iterate = subparsers.add_parser('iterate_keys', help = 'Iterate over items with some prefix, output keys only') p_iterate.add_argument('prefix', help = 'Prefix (e.g. "s/k:emissions/")') p_iterate.add_argument('key', nargs='*', help = 'Key parts') p_iterate = subparsers.add_parser('iterate_values', help = 'Iterate over items with some prefix, output values only') p_iterate.add_argument('prefix', help = 'Prefix (e.g. "s/k:emissions/")') p_iterate.add_argument('key', nargs='*', help = 'Key parts') return parser.parse_args() def run(args): dbpath = args.database if args.database is not None else 'data/application.db' keyformat = args.keyformat if args.keyformat is not None else '' valueformat = args.valueformat if args.valueformat is not None else 'b' if args.cmd == 'max_height' or args.key is None or len(args.key) == 0: key = None else: if len(args.key) > len(keyformat) + 1: raise Exception('Too many key elements for keyformat') key = [int(args.key[0])] for f, k in zip(keyformat, args.key[1:]): if f in ['i', 'I', 'q', 'Q']: key.append(int(k)) else: key.append(k) with plyvel.DB(dbpath) as db: if args.height is None or args.cmd == 'max_height': height = iavltree.max_height(db) else: height = args.height if args.cmd == 'max_height': print(height) elif args.cmd == 'get': result = iavltree.get(db, args.prefix, height, keyformat, key) if result is not None: print(decode_output(valueformat, result)) elif args.cmd == 'count': result = iavltree.count(db, args.prefix, height, keyformat, key = key) print(result) elif args.cmd == 'iterate' or args.cmd == 'iterate_keys' or args.cmd == 'iterate_values': it = iavltree.iterate(db, args.prefix, height, keyformat, key = key) try: for k, v in it: if args.cmd == 'iterate_keys': print(k) elif args.cmd == 'iterate_values': print(decode_output(valueformat,v)) else: print((k, decode_output(valueformat, v))) except BrokenPipeError: pass if __name__ == '__main__': args = get_args() run(args)