-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcli.py
More file actions
159 lines (135 loc) · 6.54 KB
/
Copy pathcli.py
File metadata and controls
159 lines (135 loc) · 6.54 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
from typing import Tuple, List, Any, Optional
import sys
import os
import io
import csv
import logging
import warnings
import argparse
import chardet
import tqdm
import comparesv
from version import __version__
from pprint import pprint
# This file is derived from https://github.com/maxharlow/csvmatch/blob/master/cli.py
def main() -> None:
"""Main entry point for the CLI application."""
logging.captureWarnings(True)
logging.basicConfig(level=logging.WARN, format='Warning: %(message)s')
warnings.formatwarning = lambda e, *args: str(e)
sys.stderr.write('Starting up...\n')
try:
file1, file2, args = arguments()
data1, headers1 = read(*file1)
data2, headers2 = read(*file2)
results = comparesv.run(data1, headers1, data2, headers2, ticker=ticker, **args)
# formatted = format(results['values'],results['headers'])
if args.get("save_output"):
save_file("values.csv", results['headers'], results['values'])
save_file("results.csv", results['headers'], results['results'])
if args.get("include_stats") and 'stats' in results:
pprint(results['stats'])
sys.stdout.flush()
except BaseException as e:
sys.exit(e)
def ticker(text: str, total: int) -> callable:
"""Create a progress bar ticker function.
Args:
text: Progress bar description
total: Total number of items
Returns:
Progress bar update function
"""
progress = tqdm.tqdm(bar_format=text + ' |{bar}| {percentage:3.0f}% / {remaining} left', total=total)
return progress.update
def read(filename: str, encoding: Optional[str]) -> Tuple[List[List[Any]], List[str]]:
"""Read and parse a CSV file.
Args:
filename: Path to the CSV file or '-' for stdin
encoding: Character encoding to use, or None for auto-detection
Returns:
Tuple of (data rows, headers)
Raises:
Exception: If file cannot be read or parsed
"""
if not os.path.isfile(filename) and filename != '-':
raise Exception(filename + ': no such file')
file = sys.stdin if filename == '-' else io.open(filename, 'rb')
text = file.read()
if text == '':
raise Exception(filename + ': file is empty')
if not encoding:
detector = chardet.universaldetector.UniversalDetector()
text_lines = text.split(b'\n')
for i in range(0, len(text_lines)):
detector.feed(text_lines[i])
if detector.done:
break
detector.close()
encoding = detector.result['encoding'] # can't always be relied upon
sys.stderr.write(filename + ': autodetected character encoding as ' + encoding.upper() + '\n')
try:
text_decoded = text.decode(encoding)
reader = csv.reader(io.StringIO(text_decoded, newline=None))
headers = next(reader)
return list(reader), headers
except UnicodeDecodeError as e:
raise Exception(filename + ': could not read file -- try specifying the encoding')
except csv.Error as e:
raise Exception(filename + ': could not read file as a CSV')
def arguments() -> Tuple[Tuple[str, Optional[str]], Tuple[str, Optional[str]], dict]:
"""Parse command line arguments.
Returns:
Tuple of ((file1, encoding1), (file2, encoding2), args_dict)
"""
parser = argparse.ArgumentParser(description='CSV files comparison')
parser.add_argument('-v', '--version', action='version', version=__version__)
parser.add_argument('FILE1', nargs='?', default='-', help='the first CSV file')
parser.add_argument('FILE2', nargs='?', default='-', help='the second CSV file')
parser.add_argument('--enc1', type=str, metavar='ENCODING', help='encoding of the first file (default is to autodetect)')
parser.add_argument('--enc2', type=str, metavar='ENCODING', help='encoding of the second file (default is to autodetect)')
parser.add_argument('-i', '--ignore-case', action='store_true', help='ignore case (default is case-sensitive)')
parser.add_argument('-rm', '--row-match', default='order', help='Logic to be used to identify the rows. Possible options \'order\', \'fuzzy\', \'deep\' (default is order)')
parser.add_argument('-cm', '--column-match', default='exact', help='Logic to be used to identify the columns. Possible options \'exact\',\'fuzzy\' (default is exact)')
parser.add_argument('-sm', '--string-match', default='exact', help='Logic to be used to identify the columns. Possible options \'exact\',\'fuzzy\' (default is exact)')
parser.add_argument('-ir', '--include-addnl-rows', action='store_true', help='Include additional rows from second file (default is false)')
parser.add_argument('-ic', '--include-addnl-columns', action='store_true', help='Include additional columns from second file (default is false)')
parser.add_argument('-is', '--include-stats', action='store_true', help='Include stats (default is false)')
parser.add_argument('-s', '--save-output', action='store_true', help='Save output to file. This saves the output in the current directory (default is false)')
args = vars(parser.parse_args())
if args['FILE1'] == '-' and args['FILE2'] == '-':
parser.print_help(sys.stderr)
parser.exit(1)
file1 = args.pop('FILE1')
file2 = args.pop('FILE2')
enc1 = args.pop('enc1')
enc2 = args.pop('enc2')
return (file1, enc1), (file2, enc2), args
def save_file(file_name: str, keys: List[str], results: List[List[Any]]) -> None:
"""Save comparison results to a CSV file.
Args:
file_name: Output file name
keys: Column headers
results: Data rows to write
"""
updated_keys = ['S.No'] + keys
updated_results = [[idx+1]+result for idx,result in enumerate(results)]
with open(os.path.join(os.getcwd(), file_name), 'w', newline='') as file:
writer = csv.writer(file, lineterminator='\n') # can't use dictwriter as headers are printed even when there's no results
writer.writerow(updated_keys)
writer.writerows(updated_results)
def format(results: List[List[Any]], keys: List[str]) -> str:
"""Format results as CSV string.
Args:
results: Data rows
keys: Column headers
Returns:
Formatted CSV string
"""
writer_io = io.StringIO()
writer = csv.writer(writer_io, lineterminator='\n') # can't use dictwriter as headers are printed even when there's no results
writer.writerow(keys)
writer.writerows(results)
return writer_io.getvalue()[:-1]
if __name__ == '__main__':
main()