tweaks to the create table failover logic
[migration-tools.git] / mig-bin / mig-gsheet
1 #!/usr/bin/perl
2
3 use Env qw(
4     HOME PGHOST PGPORT PGUSER PGDATABASE MIGSCHEMA
5     MIGBASEWORKDIR MIGBASEGITDIR MIGGITDIR MIGWORKDIR
6 );
7 use Net::Google::Spreadsheets;
8 use Net::Google::DataAPI::Auth::OAuth2;
9 use Net::OAuth2::AccessToken;
10 use Storable;
11 use DBI;
12 use FindBin;
13 use lib "$FindBin::Bin/";
14 my $mig_bin = "$FindBin::Bin/";
15 use Mig;
16 use strict;
17 use Env qw(
18     HOME PGHOST PGPORT PGUSER PGDATABASE MIGSCHEMA
19     MIGBASEWORKDIR MIGBASEGITDIR MIGGITDIR MIGWORKDIR
20     );
21 use Switch;
22 use Cwd 'abs_path';
23 use Pod::Usage;
24 use Data::Dumper;
25 use DateTime;
26
27 pod2usage(-verbose => 2) if defined $ARGV[0] && $ARGV[0] eq '--help';
28
29 Mig::die_if_no_env_migschema();
30 die_if_gsheet_tracked_table_does_not_exist();
31 die_if_gsheet_tracked_column_does_not_exist();
32
33 my $cmd_push;
34 my $next_arg_is_push;
35 my $cmd_pull;
36 my $next_arg_is_pull;
37 my @worksheet_names;
38 my $cmd_export = 0;
39 my @table_names;
40 my $sql;
41 my $sth;
42 my @ws;
43 my @tracked_ws_names;
44
45 foreach my $arg (@ARGV) {
46     if ($arg eq '--push') {
47         $next_arg_is_push = 1;
48         next;
49     }
50     if ($next_arg_is_push) {
51         $cmd_push = $arg;
52         $next_arg_is_push = 0;
53         next;
54     }
55     if ($arg eq '--pull') {
56         $next_arg_is_pull = 1;
57         next;
58     }
59     if ($next_arg_is_pull) {
60         $cmd_pull = $arg;
61         $next_arg_is_pull = 0;
62         next;
63     }
64     if ($arg eq '--export') {
65         $cmd_export = 1;
66         next;
67     }
68 }
69
70 abort('must specify --push (db->worksheets) or --pull (worksheets->db)') unless (defined $cmd_push or defined $cmd_pull);
71 if (defined $cmd_push and defined $cmd_pull) { abort('you can not specify both a --push and --pull on the same command'); }
72
73 my $dbh = Mig::db_connect();
74 my $spreadsheet = connect_gsheet();
75 abort('could not connect to google sheet') unless (defined $spreadsheet);
76
77 $sql = 'SELECT tab_name FROM gsheet_tracked_table;';
78 $sth = $dbh->prepare($sql);
79 my $ra = $sth->execute();
80 while (my @row = $sth->fetchrow_array) {
81     push @tracked_ws_names, $row[0];
82 }
83
84 if (defined $cmd_pull) {
85     print "Pulling ";
86     if ($cmd_pull eq 'all') {
87         print "all worksheets.\n";
88         @ws = $spreadsheet->worksheets;
89         foreach my $wsn (@ws) { push @worksheet_names, $wsn->title; }
90     } else {
91         print "only worksheet $cmd_pull.\n";
92         if (!defined $cmd_pull) { abort('command incomplete'); } 
93         push @worksheet_names, $cmd_pull;
94     }
95     my @m = array_match(\@worksheet_names,\@tracked_ws_names);
96     foreach my $w (@m) { 
97         my $pull_ws = $spreadsheet->worksheet( {title => $w} ); 
98         my $push_tb = get_table_name($w,$dbh);
99         my @rows = $pull_ws->rows;
100         my @content;
101         map { $content[$_->row - 1][$_->col - 1] = $_->content } $pull_ws->cells;
102         my @tab_headers = shift @content;
103         my $tab_headers_length = $#{ $tab_headers[0] };
104         my @pg_headers;
105         for my $i ( 0 .. $tab_headers_length ) {
106             push @pg_headers, $tab_headers[0][$i];
107         }
108         shift @content;
109         #todo: check for clean headers at some point ...
110         truncate_table($push_tb,$dbh);
111         print "Inserting from $w to $push_tb.\n";
112         for my $j (@content) {
113             insert_row($MIGSCHEMA,$push_tb,$dbh,\@pg_headers,$j);
114         }
115         timestamp($push_tb,$dbh,'pull');
116         if ($cmd_export == 1) { export_table($dbh,$push_tb); }
117     }
118 }
119
120 if (defined $cmd_push) {
121     print "Pushing ";
122     my @tab_names;
123     if ($cmd_push eq 'all') {
124         print "all worksheets.\n";
125         $sql = 'SELECT tab_name FROM gsheet_tracked_table;';
126         $sth = $dbh->prepare($sql);
127         $ra = $sth->execute();
128         while (my @row = $sth->fetchrow_array) {
129             push @tab_names, $row[0];
130         }
131     } else {
132         print "only worksheet $cmd_push.\n";
133         if (!defined $cmd_push) { abort('command incomplete'); }
134         push @tab_names, $cmd_push;
135     }
136     foreach my $push_ws_name (@tab_names) {
137         my $pull_tb = get_table_name($push_ws_name,$dbh);
138         my @table_headers = get_pg_column_headers($pull_tb,$MIGSCHEMA);
139         print "worksheetname: $push_ws_name\n";
140         my $push_ws = $spreadsheet->worksheet( {title => $push_ws_name} );
141         if (!defined $push_ws) { next; }
142         my @rows;
143         my $i = 0;
144         foreach my $rth (@table_headers) { $rows[0][$i] = $rth; $i++; }         
145         $sql = "SELECT * FROM $pull_tb;";
146         $sth = $dbh->prepare($sql);
147         $sth->execute();
148         my $grabhash = $sth->fetchall_arrayref({});
149         erase_sheet($push_ws,$push_ws_name);
150
151         #get from postgres the headers to use in the sheet from tracked columns
152         $sql = 'SELECT column_name FROM gsheet_tracked_column WHERE table_id = (SELECT id FROM gsheet_tracked_table WHERE table_name = \'' . $pull_tb . '\')';
153         $sth = $dbh->prepare($sql);
154         $sth->execute();
155         my $sheet_headers = $sth->fetchall_arrayref();
156         my $sheet_headers_length = @$sheet_headers;
157         #now I need to do new rows using those headers
158         my @content;
159         foreach my $row ( @{$grabhash} ) {
160             my $record = {};
161             for my $column ( sort keys %{ $row } ) {
162                 #print Dumper(@$sheet_headers);
163                 #print "column: $column\n";
164                 my $clean_column = $column;
165                 $clean_column =~ s/_//g;
166                 if ( $column ~~ @$sheet_headers ) {
167                     $record->{$clean_column} = $row->{$column}; 
168                 }
169             }
170             push @content, $record;
171         }
172         print "Writing to $push_ws_name\n";
173         foreach my $fillsheet (@content) {
174             my $new_row = $push_ws->add_row (
175                 $fillsheet
176             );
177         }
178         timestamp($pull_tb,$dbh,'push');
179         if ($cmd_export == 1) { export_table($dbh,$pull_tb); }
180     }
181 }   
182
183 sub export_table {
184     my $dbh = shift;
185     my $table = shift;
186
187     my $dt = DateTime->now;
188     my $date = $dt->ymd;
189     my $hms = $dt->hms;
190     my $efile = $MIGGITDIR . $table . '_' . $date . '_' . $hms . '.tsv';
191     my @data;
192     my $record_count = 0;
193     $dbh->do("COPY $table TO STDOUT CSV DELIMITER E'\t' HEADER;");
194     1 while $dbh->pg_getcopydata(\$data[$record_count++]) >= 0;
195     open (my $eout, '>', $efile) or abort("Could NOT open $efile.");
196     foreach my $d (@data) {
197         print $eout $d;
198     }
199     print "$efile written.\n";
200     close $eout;
201     return;
202 }
203
204 sub die_if_gsheet_tracked_table_does_not_exist {
205     if (!check_for_gsheet_tracked_table()) {
206         die "Table gsheet_tracked_table does not exist.  Bailing...\n";
207     }
208 }
209
210 sub array_match {
211     my ($xa,$xb) = @_;
212     my @a = @{ $xa };
213     my @b = @{ $xb };
214     my @r;
215
216     foreach my $av (@a) {
217         foreach my $bv (@b) {
218             if ($av eq $bv) { push @r, $bv; }
219         }
220     }    
221     return @r;
222 }
223
224 sub get_pg_column_headers {
225     my $table_name = shift;
226     my $schema_name = shift;
227     my @headers;
228     my $dbh = Mig::db_connect();
229     $sql = 'SELECT column_name FROM information_schema.columns WHERE table_schema = ' . $dbh->quote( $schema_name ) . ' AND table_name = ' . $dbh->quote( $table_name ) . ';';
230     $sth = $dbh->prepare($sql);
231     $ra = $sth->execute();
232     while (my @row = $sth->fetchrow_array) {
233         push @headers, $row[0];
234     }
235     return @headers;
236 }
237
238 sub erase_sheet {
239     my $ws = shift;
240     my $ws_name = shift;
241
242     print "Erasing $ws_name.\n";
243     my @rows = $ws->rows;
244     splice @rows, 0, 1;
245     my $i = @rows;
246     while ($i > 0) {
247         my $row = pop @rows;
248         $row->delete;
249         $i--;
250     }
251     return;
252 }
253
254 sub check_for_gsheet_tracked_table {
255     my $dbh = Mig::db_connect();
256     my $sth = $dbh->prepare("
257         SELECT EXISTS(
258             SELECT 1
259             FROM information_schema.tables
260             WHERE table_schema = " . $dbh->quote( $MIGSCHEMA ) . "
261             AND table_name = 'gsheet_tracked_table'
262         );"
263     );
264     my $rv = $sth->execute()
265         || die "Error checking for table (tracked_gsheet_table): $!";
266     my @cols = $sth->fetchrow_array;
267     $sth->finish;
268     Mig::db_disconnect($dbh);
269     return $cols[0];
270 }
271
272 sub die_if_gsheet_tracked_column_does_not_exist {
273     if (!check_for_gsheet_tracked_column()) {
274         die "Table $MIGSCHEMA.gsheet_tracked_column does not exist.  Bailing...\n";
275     }
276 }
277
278 sub get_table_name {
279     my $worksheet = shift;
280     my $dbh = shift;
281
282     my $sql = 'SELECT table_name FROM gsheet_tracked_table WHERE tab_name = \'' . $worksheet . '\';';
283     my $sth = $dbh->prepare($sql);
284     my $xs = $sth->execute();
285     my $table_name;
286     while (my @row = $sth->fetchrow_array) {
287         $table_name = $row[0];
288     }
289
290     return $table_name;
291 }
292
293 #sub get_worksheet_name {
294 #    my $table = shift;
295 #    my $dbh = shift;
296 #
297 #    my $sql = 'SELECT tab_name FROM gsheet_tracked_table WHERE table_name = \'' . $table . '\';';
298 #    print "$sql \n";
299 #    my $sth = $dbh->prepare($sql);
300 #    my $xs = $sth->execute();
301 #    my $worksheet_name;
302 #    while (my @row = $sth->fetchrow_array) {
303 #        $worksheet_name = $row[0];
304 #    }
305 #
306 #    return $worksheet_name;
307 #}
308
309
310 sub check_for_gsheet_tracked_column {
311     my $dbh = Mig::db_connect();
312     my $sth = $dbh->prepare("
313         SELECT EXISTS(
314             SELECT 1
315             FROM information_schema.tables
316             WHERE table_schema = " . $dbh->quote( $MIGSCHEMA ) . "
317             AND table_name = 'gsheet_tracked_column'
318         );"
319     );
320     my $rv = $sth->execute()
321         || die "Error checking for table (gsheet_tracked_column): $!";
322     my @cols = $sth->fetchrow_array;
323     $sth->finish;
324     Mig::db_disconnect($dbh);
325     return $cols[0];
326 }
327
328 sub insert_row {
329     my ($schema, $table, $dbh, $headers_ref, $row_ref) = @_;
330     my @headers = @{ $headers_ref };
331     my @row_data = @{ $row_ref };
332
333     my $header_string = '(' . join(",", @headers) . ')';
334     map {s/\'/\'\'/g; } @row_data;
335     my $row_string = '(' . join(",", map {qq/'$_'/} @row_data) . ')';
336     #print "INSERT INTO $schema.$table $header_string VALUES $row_string\n"; 
337     $dbh->do(qq/
338         INSERT INTO $schema.$table $header_string VALUES $row_string ;
339     /);
340 }
341
342 sub timestamp {
343     my ($table, $dbh, $action) = @_;
344
345     my $column;
346     if ($action eq 'pull') { $column = 'last_pulled' }
347         else { $column = 'last_pushed' }; 
348
349     $dbh->do(qq/
350         UPDATE gsheet_tracked_table SET $column = NOW() WHERE table_name = '$table';
351     /);
352
353 }
354
355
356 sub truncate_table {
357     my $table = shift;
358     my $dbh = shift;
359
360     $dbh->do(qq/
361         TRUNCATE TABLE $table;;
362     /);
363     print "Table $table truncated.\n";
364 }
365
366 sub abort {
367     my $msg = shift;
368     print STDERR "$0: $msg", "\n";
369     exit 1;
370 }
371
372 sub connect_gsheet {
373     if (!defined $ENV{'CLIENTID'}) {
374         exec '/bin/bash', '--init-file', '~/.mig/oauth.env';
375         print "Open Authentication settings were not loaded, please re-run.\n";
376     }
377     my $session_filename = $ENV{SESSIONFILE};
378     my $oauth2 = Net::Google::DataAPI::Auth::OAuth2->new(
379         client_id => $ENV{CLIENTID},
380         client_secret => $ENV{CLIENTSECRET},
381         scope => ['http://spreadsheets.google.com/feeds/'],
382         redirect_uri => 'https://developers.google.com/oauthplayground',
383     );
384     my $session = retrieve($session_filename);
385     my $restored_token = Net::OAuth2::AccessToken->session_thaw(
386         $session,
387         auto_refresh => 1,
388         profile => $oauth2->oauth2_webserver,
389     );
390     $oauth2->access_token($restored_token);
391     my $service = Net::Google::Spreadsheets->new(auth => $oauth2);
392
393     my $spreadsheet = $service->spreadsheet(
394         {  
395             title => $MIGSCHEMA
396         }
397     );
398     return $spreadsheet;
399 }
400
401