first cuts of kmig-*iconv
[migration-tools.git] / emig.d / bin / mig-gsheet
1 #!/usr/bin/perl
2
3 use Env qw(
4     HOME PGHOST PGPORT PGUSER PGDATABASE MIGSCHEMA
5     MIGBASEWORKDIR MIGBASEGITDIR MIGGITDIR MIGWORKDIR
6 );
7 use Net::Google::Spreadsheets;
8 use Net::Google::DataAPI::Auth::OAuth2;
9 use Net::OAuth2::AccessToken;
10 use Storable;
11 use DBI;
12 use FindBin;
13 use lib "$FindBin::Bin/";
14 my $mig_bin = "$FindBin::Bin/";
15 use Mig;
16 use strict;
17 use Switch;
18 use Cwd 'abs_path';
19 use Pod::Usage;
20 use Data::Dumper;
21 use DateTime;
22
23 pod2usage(-verbose => 2) if defined $ARGV[0] && $ARGV[0] eq '--help';
24
25 Mig::die_if_no_env_migschema();
26 die_if_gsheet_tracked_table_does_not_exist();
27 die_if_gsheet_tracked_column_does_not_exist();
28
29 my $cmd_push;
30 my $next_arg_is_push;
31 my $cmd_pull;
32 my $next_arg_is_pull;
33 my @worksheet_names;
34 my $cmd_export = 0;
35 my @table_names;
36 my $sql;
37 my $sth;
38 my @ws;
39 my @tracked_ws_names;
40 my $authfile = $ENV{HOME} . '/.mig/oauth.env';
41 my $next_arg_is_authfile;
42
43 foreach my $arg (@ARGV) {
44     if ($arg eq '--push') {
45         $next_arg_is_push = 1;
46         next;
47     }
48     if ($next_arg_is_push) {
49         $cmd_push = $arg;
50         $next_arg_is_push = 0;
51         next;
52     }
53     if ($arg eq '--pull') {
54         $next_arg_is_pull = 1;
55         next;
56     }
57     if ($next_arg_is_pull) {
58         $cmd_pull = $arg;
59         $next_arg_is_pull = 0;
60         next;
61     }
62     if ($arg eq '--authfile') {
63         $next_arg_is_authfile = 1;
64         next;
65     }
66     if ($next_arg_is_authfile) {
67         $authfile = $arg;
68         $next_arg_is_authfile = 0;
69         next;
70     }
71     if ($arg eq '--export') {
72         $cmd_export = 1;
73         next;
74     }
75 }
76
77 abort('must specify --push (db->worksheets) or --pull (worksheets->db)') unless (defined $cmd_push or defined $cmd_pull);
78 if (defined $cmd_push and defined $cmd_pull) { abort('you can not specify both a --push and --pull on the same command'); }
79
80 my $clientid;
81 my $clientsecret;
82 my $sessionfile;
83
84 open (my $fh, '<', $authfile) or abort("Could not open $authfile");
85 while (my $var = <$fh>) {
86     chomp $var;
87     my ($var1, $var2) = split /=/,$var;
88     if ($var1 eq 'CLIENTID') { $clientid = $var2; }
89     if ($var1 eq 'CLIENTSECRET') { $clientsecret = $var2; }
90     if ($var1 eq 'SESSIONFILE') { $sessionfile = $var2; }   
91 }
92 my $dbh = Mig::db_connect();
93 my $spreadsheet = connect_gsheet($clientid,$clientsecret,$sessionfile);
94 abort('could not connect to google sheet') unless (defined $spreadsheet);
95
96 $sql = 'SELECT tab_name FROM gsheet_tracked_table;';
97 $sth = $dbh->prepare($sql);
98 my $ra = $sth->execute();
99 while (my @row = $sth->fetchrow_array) {
100     push @tracked_ws_names, $row[0];
101 }
102
103 if (defined $cmd_pull) {
104     print "Pulling ";
105     if ($cmd_pull eq 'all') {
106         print "all worksheets.\n";
107         @ws = $spreadsheet->worksheets;
108         foreach my $wsn (@ws) { push @worksheet_names, $wsn->title; }
109     } else {
110         print "only worksheet $cmd_pull.\n";
111         if (!defined $cmd_pull) { abort('command incomplete'); } 
112         push @worksheet_names, $cmd_pull;
113     }
114     my @m = array_match(\@worksheet_names,\@tracked_ws_names);
115     foreach my $w (@m) { 
116         my $pull_ws = $spreadsheet->worksheet( {title => $w} ); 
117         my $push_tb = get_table_name($w,$dbh);
118         my @rows = $pull_ws->rows;
119         my @content;
120         map { $content[$_->row - 1][$_->col - 1] = $_->content } $pull_ws->cells;
121         my @tab_headers = shift @content;
122         my $tab_headers_length = $#{ $tab_headers[0] };
123         my @pg_headers;
124         for my $i ( 0 .. $tab_headers_length ) {
125             push @pg_headers, $tab_headers[0][$i];
126         }
127         shift @content;
128         #todo: check for clean headers at some point ...
129         truncate_table($push_tb,$dbh);
130         print "Inserting from $w to $push_tb.\n";
131         for my $j (@content) {
132             insert_row($MIGSCHEMA,$push_tb,$dbh,\@pg_headers,$j);
133         }
134         timestamp($push_tb,$dbh,'pull');
135         if ($cmd_export == 1) { export_table($dbh,$push_tb); }
136     }
137 }
138
139 if (defined $cmd_push) {
140     print "Pushing ";
141     my @tab_names;
142     if ($cmd_push eq 'all') {
143         print "all worksheets.\n";
144         $sql = 'SELECT tab_name FROM gsheet_tracked_table;';
145         $sth = $dbh->prepare($sql);
146         $ra = $sth->execute();
147         while (my @row = $sth->fetchrow_array) {
148             push @tab_names, $row[0];
149         }
150     } else {
151         print "only worksheet $cmd_push.\n";
152         if (!defined $cmd_push) { abort('command incomplete'); }
153         push @tab_names, $cmd_push;
154     }
155     foreach my $push_ws_name (@tab_names) {
156         my $pull_tb = get_table_name($push_ws_name,$dbh);
157         my @table_headers = get_pg_column_headers($pull_tb,$MIGSCHEMA);
158         print "worksheetname: $push_ws_name\n";
159         my $push_ws = $spreadsheet->worksheet( {title => $push_ws_name} );
160         if (!defined $push_ws) { next; }
161         my @rows;
162         my $i = 0;
163         foreach my $rth (@table_headers) { $rows[0][$i] = $rth; $i++; }         
164         $sql = "SELECT * FROM $pull_tb;";
165         $sth = $dbh->prepare($sql);
166         $sth->execute();
167         my $grabhash = $sth->fetchall_arrayref({});
168         erase_sheet($push_ws,$push_ws_name);
169
170         #get from postgres the headers to use in the sheet from tracked columns
171         $sql = 'SELECT column_name FROM gsheet_tracked_column WHERE table_id = (SELECT id FROM gsheet_tracked_table WHERE table_name = \'' . $pull_tb . '\')';
172         $sth = $dbh->prepare($sql);
173         $sth->execute();
174         my $sheet_headers = $sth->fetchall_arrayref();
175         my $sheet_headers_length = @$sheet_headers;
176         #now I need to do new rows using those headers
177         my @content;
178         foreach my $row ( @{$grabhash} ) {
179             my $record = {};
180             for my $column ( sort keys %{ $row } ) {
181                 #print Dumper(@$sheet_headers);
182                 #print "column: $column\n";
183                 my $clean_column = $column;
184                 $clean_column =~ s/_//g;
185                 if ( $column ~~ @$sheet_headers ) {
186                     $record->{$clean_column} = $row->{$column}; 
187                 }
188             }
189             push @content, $record;
190         }
191         print "Writing to $push_ws_name\n";
192         foreach my $fillsheet (@content) {
193             my $new_row = $push_ws->add_row (
194                 $fillsheet
195             );
196         }
197         timestamp($pull_tb,$dbh,'push');
198         if ($cmd_export == 1) { export_table($dbh,$pull_tb); }
199     }
200 }   
201
202 sub export_table {
203     my $dbh = shift;
204     my $table = shift;
205
206     my $dt = DateTime->now;
207     my $date = $dt->ymd;
208     my $hms = $dt->hms;
209     my $efile = $MIGGITDIR . $table . '_' . $date . '_' . $hms . '.tsv';
210     my @data;
211     my $record_count = 0;
212     $dbh->do("COPY $table TO STDOUT CSV DELIMITER E'\t' HEADER;");
213     1 while $dbh->pg_getcopydata(\$data[$record_count++]) >= 0;
214     open (my $eout, '>', $efile) or abort("Could NOT open $efile.");
215     foreach my $d (@data) {
216         print $eout $d;
217     }
218     print "$efile written.\n";
219     close $eout;
220     return;
221 }
222
223 sub die_if_gsheet_tracked_table_does_not_exist {
224     if (!check_for_gsheet_tracked_table()) {
225         die "Table gsheet_tracked_table does not exist.  Bailing...\n";
226     }
227 }
228
229 sub array_match {
230     my ($xa,$xb) = @_;
231     my @a = @{ $xa };
232     my @b = @{ $xb };
233     my @r;
234
235     foreach my $av (@a) {
236         foreach my $bv (@b) {
237             if ($av eq $bv) { push @r, $bv; }
238         }
239     }    
240     return @r;
241 }
242
243 sub get_pg_column_headers {
244     my $table_name = shift;
245     my $schema_name = shift;
246     my @headers;
247     my $dbh = Mig::db_connect();
248     $sql = 'SELECT column_name FROM information_schema.columns WHERE table_schema = ' . $dbh->quote( $schema_name ) . ' AND table_name = ' . $dbh->quote( $table_name ) . ';';
249     $sth = $dbh->prepare($sql);
250     $ra = $sth->execute();
251     while (my @row = $sth->fetchrow_array) {
252         push @headers, $row[0];
253     }
254     return @headers;
255 }
256
257 sub erase_sheet {
258     my $ws = shift;
259     my $ws_name = shift;
260
261     print "Erasing $ws_name.\n";
262     my @rows = $ws->rows;
263     splice @rows, 0, 1;
264     my $i = @rows;
265     while ($i > 0) {
266         my $row = pop @rows;
267         $row->delete;
268         $i--;
269     }
270     return;
271 }
272
273 sub check_for_gsheet_tracked_table {
274     my $dbh = Mig::db_connect();
275     my $sth = $dbh->prepare("
276         SELECT EXISTS(
277             SELECT 1
278             FROM information_schema.tables
279             WHERE table_schema = " . $dbh->quote( $MIGSCHEMA ) . "
280             AND table_name = 'gsheet_tracked_table'
281         );"
282     );
283     my $rv = $sth->execute()
284         || die "Error checking for table (tracked_gsheet_table): $!";
285     my @cols = $sth->fetchrow_array;
286     $sth->finish;
287     Mig::db_disconnect($dbh);
288     return $cols[0];
289 }
290
291 sub die_if_gsheet_tracked_column_does_not_exist {
292     if (!check_for_gsheet_tracked_column()) {
293         die "Table $MIGSCHEMA.gsheet_tracked_column does not exist.  Bailing...\n";
294     }
295 }
296
297 sub get_table_name {
298     my $worksheet = shift;
299     my $dbh = shift;
300
301     my $sql = 'SELECT table_name FROM gsheet_tracked_table WHERE tab_name = \'' . $worksheet . '\';';
302     my $sth = $dbh->prepare($sql);
303     my $xs = $sth->execute();
304     my $table_name;
305     while (my @row = $sth->fetchrow_array) {
306         $table_name = $row[0];
307     }
308
309     return $table_name;
310 }
311
312 #sub get_worksheet_name {
313 #    my $table = shift;
314 #    my $dbh = shift;
315 #
316 #    my $sql = 'SELECT tab_name FROM gsheet_tracked_table WHERE table_name = \'' . $table . '\';';
317 #    print "$sql \n";
318 #    my $sth = $dbh->prepare($sql);
319 #    my $xs = $sth->execute();
320 #    my $worksheet_name;
321 #    while (my @row = $sth->fetchrow_array) {
322 #        $worksheet_name = $row[0];
323 #    }
324 #
325 #    return $worksheet_name;
326 #}
327
328
329 sub check_for_gsheet_tracked_column {
330     my $dbh = Mig::db_connect();
331     my $sth = $dbh->prepare("
332         SELECT EXISTS(
333             SELECT 1
334             FROM information_schema.tables
335             WHERE table_schema = " . $dbh->quote( $MIGSCHEMA ) . "
336             AND table_name = 'gsheet_tracked_column'
337         );"
338     );
339     my $rv = $sth->execute()
340         || die "Error checking for table (gsheet_tracked_column): $!";
341     my @cols = $sth->fetchrow_array;
342     $sth->finish;
343     Mig::db_disconnect($dbh);
344     return $cols[0];
345 }
346
347 sub insert_row {
348     my ($schema, $table, $dbh, $headers_ref, $row_ref) = @_;
349     my @headers = @{ $headers_ref };
350     my @row_data = @{ $row_ref };
351
352     my $header_string = '(' . join(",", @headers) . ')';
353     map {s/\'/\'\'/g; } @row_data;
354     my $row_string = '(' . join(",", map {qq/'$_'/} @row_data) . ')';
355     #print "INSERT INTO $schema.$table $header_string VALUES $row_string\n"; 
356     $dbh->do(qq/
357         INSERT INTO $schema.$table $header_string VALUES $row_string ;
358     /);
359 }
360
361 sub timestamp {
362     my ($table, $dbh, $action) = @_;
363
364     my $column;
365     if ($action eq 'pull') { $column = 'last_pulled' }
366         else { $column = 'last_pushed' }; 
367
368     $dbh->do(qq/
369         UPDATE gsheet_tracked_table SET $column = NOW() WHERE table_name = '$table';
370     /);
371
372 }
373
374
375 sub truncate_table {
376     my $table = shift;
377     my $dbh = shift;
378
379     $dbh->do(qq/
380         TRUNCATE TABLE $table;;
381     /);
382     print "Table $table truncated.\n";
383 }
384
385 sub abort {
386     my $msg = shift;
387     print STDERR "$0: $msg", "\n";
388     exit 1;
389 }
390
391 sub connect_gsheet {
392
393     my ($clientid,$clientsecret,$sessionfile) = @_;
394
395     my $oauth2 = Net::Google::DataAPI::Auth::OAuth2->new(
396         client_id => $clientid,
397         client_secret => $clientsecret,
398         scope => ['http://spreadsheets.google.com/feeds/'],
399         redirect_uri => 'https://developers.google.com/oauthplayground',
400     );
401     if ($sessionfile =~ m/~/) {$sessionfile =~ s/~/$ENV{HOME}/; }
402     my $session = retrieve($sessionfile);
403     my $restored_token = Net::OAuth2::AccessToken->session_thaw(
404         $session,
405         auto_refresh => 1,
406         profile => $oauth2->oauth2_webserver,
407     );
408     $oauth2->access_token($restored_token);
409     my $service = Net::Google::Spreadsheets->new(auth => $oauth2);
410
411     my $spreadsheet = $service->spreadsheet(
412         {  
413             title => $MIGSCHEMA
414         }
415     );
416     return $spreadsheet;
417 }
418
419