#!/usr/bin/perl use Env qw( HOME PGHOST PGPORT PGUSER PGDATABASE MIGSCHEMA MIGBASEWORKDIR MIGBASEGITDIR MIGGITDIR MIGWORKDIR ); use Net::Google::Spreadsheets; use Net::Google::DataAPI::Auth::OAuth2; use Net::OAuth2::AccessToken; use Storable; use DBI; use FindBin; use lib "$FindBin::Bin/"; my $mig_bin = "$FindBin::Bin/"; use EMig; use strict; use Switch; use Cwd 'abs_path'; use Pod::Usage; use Data::Dumper; use DateTime; pod2usage(-verbose => 2) if defined $ARGV[0] && $ARGV[0] eq '--help'; EMig::die_if_no_env_migschema(); die_if_gsheet_tracked_table_does_not_exist(); die_if_gsheet_tracked_column_does_not_exist(); my $cmd_push; my $next_arg_is_push; my $cmd_pull; my $next_arg_is_pull; my @worksheet_names; my $cmd_export = 0; my @table_names; my $sql; my $sth; my @ws; my @tracked_ws_names; my $authfile = $ENV{HOME} . '/.emig/oauth.env'; my $next_arg_is_authfile; foreach my $arg (@ARGV) { if ($arg eq '--push') { $next_arg_is_push = 1; next; } if ($next_arg_is_push) { $cmd_push = $arg; $next_arg_is_push = 0; next; } if ($arg eq '--pull') { $next_arg_is_pull = 1; next; } if ($next_arg_is_pull) { $cmd_pull = $arg; $next_arg_is_pull = 0; next; } if ($arg eq '--authfile') { $next_arg_is_authfile = 1; next; } if ($next_arg_is_authfile) { $authfile = $arg; $next_arg_is_authfile = 0; next; } if ($arg eq '--export') { $cmd_export = 1; next; } } abort('must specify --push (db->worksheets) or --pull (worksheets->db)') unless (defined $cmd_push or defined $cmd_pull); if (defined $cmd_push and defined $cmd_pull) { abort('you can not specify both a --push and --pull on the same command'); } my $clientid; my $clientsecret; my $sessionfile; open (my $fh, '<', $authfile) or abort("Could not open $authfile"); while (my $var = <$fh>) { chomp $var; my ($var1, $var2) = split /=/,$var; if ($var1 eq 'CLIENTID') { $clientid = $var2; } if ($var1 eq 'CLIENTSECRET') { $clientsecret = $var2; } if ($var1 eq 'SESSIONFILE') { $sessionfile = $var2; } } my $dbh = EMig::db_connect(); my $spreadsheet = connect_gsheet($clientid,$clientsecret,$sessionfile); abort('could not connect to google sheet') unless (defined $spreadsheet); $sql = 'SELECT tab_name FROM gsheet_tracked_table;'; $sth = $dbh->prepare($sql); my $ra = $sth->execute(); while (my @row = $sth->fetchrow_array) { push @tracked_ws_names, $row[0]; } if (defined $cmd_pull) { print "Pulling "; if ($cmd_pull eq 'all') { print "all worksheets.\n"; @ws = $spreadsheet->worksheets; foreach my $wsn (@ws) { push @worksheet_names, $wsn->title; } } else { print "only worksheet $cmd_pull.\n"; if (!defined $cmd_pull) { abort('command incomplete'); } push @worksheet_names, $cmd_pull; } my @m = array_match(\@worksheet_names,\@tracked_ws_names); foreach my $w (@m) { my $pull_ws = $spreadsheet->worksheet( {title => $w} ); my $push_tb = get_table_name($w,$dbh); my @rows = $pull_ws->rows; my @content; map { $content[$_->row - 1][$_->col - 1] = $_->content } $pull_ws->cells; my @tab_headers = shift @content; my $tab_headers_length = $#{ $tab_headers[0] }; my @pg_headers; for my $i ( 0 .. $tab_headers_length ) { push @pg_headers, $tab_headers[0][$i]; } shift @content; #todo: check for clean headers at some point ... truncate_table($push_tb,$dbh); print "Inserting from $w to $push_tb.\n"; for my $j (@content) { insert_row($MIGSCHEMA,$push_tb,$dbh,\@pg_headers,$j); } timestamp($push_tb,$dbh,'pull'); if ($cmd_export == 1) { export_table($dbh,$push_tb); } } } if (defined $cmd_push) { print "Pushing "; my @tab_names; if ($cmd_push eq 'all') { print "all worksheets.\n"; $sql = 'SELECT tab_name FROM gsheet_tracked_table;'; $sth = $dbh->prepare($sql); $ra = $sth->execute(); while (my @row = $sth->fetchrow_array) { push @tab_names, $row[0]; } } else { print "only worksheet $cmd_push.\n"; if (!defined $cmd_push) { abort('command incomplete'); } push @tab_names, $cmd_push; } foreach my $push_ws_name (@tab_names) { my $pull_tb = get_table_name($push_ws_name,$dbh); my @table_headers = get_pg_column_headers($pull_tb,$MIGSCHEMA); print "worksheetname: $push_ws_name\n"; my $push_ws = $spreadsheet->worksheet( {title => $push_ws_name} ); if (!defined $push_ws) { next; } my @rows; my $i = 0; foreach my $rth (@table_headers) { $rows[0][$i] = $rth; $i++; } $sql = "SELECT * FROM $pull_tb;"; $sth = $dbh->prepare($sql); $sth->execute(); my $grabhash = $sth->fetchall_arrayref({}); erase_sheet($push_ws,$push_ws_name); #get from postgres the headers to use in the sheet from tracked columns $sql = 'SELECT column_name FROM gsheet_tracked_column WHERE table_id = (SELECT id FROM gsheet_tracked_table WHERE table_name = \'' . $pull_tb . '\')'; $sth = $dbh->prepare($sql); $sth->execute(); my $sheet_headers = $sth->fetchall_arrayref(); my $sheet_headers_length = @$sheet_headers; #now I need to do new rows using those headers my @content; foreach my $row ( @{$grabhash} ) { my $record = {}; for my $column ( sort keys %{ $row } ) { #print Dumper(@$sheet_headers); #print "column: $column\n"; my $clean_column = $column; $clean_column =~ s/_//g; if ( $column ~~ @$sheet_headers ) { $record->{$clean_column} = $row->{$column}; } } push @content, $record; } print "Writing to $push_ws_name\n"; foreach my $fillsheet (@content) { my $new_row = $push_ws->add_row ( $fillsheet ); } timestamp($pull_tb,$dbh,'push'); if ($cmd_export == 1) { export_table($dbh,$pull_tb); } } } sub export_table { my $dbh = shift; my $table = shift; my $dt = DateTime->now; my $date = $dt->ymd; my $hms = $dt->hms; my $efile = $MIGGITDIR . $table . '_' . $date . '_' . $hms . '.tsv'; my @data; my $record_count = 0; $dbh->do("COPY $table TO STDOUT CSV DELIMITER E'\t' HEADER;"); 1 while $dbh->pg_getcopydata(\$data[$record_count++]) >= 0; open (my $eout, '>', $efile) or abort("Could NOT open $efile."); foreach my $d (@data) { print $eout $d; } print "$efile written.\n"; close $eout; return; } sub die_if_gsheet_tracked_table_does_not_exist { if (!check_for_gsheet_tracked_table()) { die "Table gsheet_tracked_table does not exist. Bailing...\n"; } } sub array_match { my ($xa,$xb) = @_; my @a = @{ $xa }; my @b = @{ $xb }; my @r; foreach my $av (@a) { foreach my $bv (@b) { if ($av eq $bv) { push @r, $bv; } } } return @r; } sub get_pg_column_headers { my $table_name = shift; my $schema_name = shift; my @headers; my $dbh = EMig::db_connect(); $sql = 'SELECT column_name FROM information_schema.columns WHERE table_schema = ' . $dbh->quote( $schema_name ) . ' AND table_name = ' . $dbh->quote( $table_name ) . ';'; $sth = $dbh->prepare($sql); $ra = $sth->execute(); while (my @row = $sth->fetchrow_array) { push @headers, $row[0]; } return @headers; } sub erase_sheet { my $ws = shift; my $ws_name = shift; print "Erasing $ws_name.\n"; my @rows = $ws->rows; splice @rows, 0, 1; my $i = @rows; while ($i > 0) { my $row = pop @rows; $row->delete; $i--; } return; } sub check_for_gsheet_tracked_table { my $dbh = EMig::db_connect(); my $sth = $dbh->prepare(" SELECT EXISTS( SELECT 1 FROM information_schema.tables WHERE table_schema = " . $dbh->quote( $MIGSCHEMA ) . " AND table_name = 'gsheet_tracked_table' );" ); my $rv = $sth->execute() || die "Error checking for table (tracked_gsheet_table): $!"; my @cols = $sth->fetchrow_array; $sth->finish; EMig::db_disconnect($dbh); return $cols[0]; } sub die_if_gsheet_tracked_column_does_not_exist { if (!check_for_gsheet_tracked_column()) { die "Table $MIGSCHEMA.gsheet_tracked_column does not exist. Bailing...\n"; } } sub get_table_name { my $worksheet = shift; my $dbh = shift; my $sql = 'SELECT table_name FROM gsheet_tracked_table WHERE tab_name = \'' . $worksheet . '\';'; my $sth = $dbh->prepare($sql); my $xs = $sth->execute(); my $table_name; while (my @row = $sth->fetchrow_array) { $table_name = $row[0]; } return $table_name; } #sub get_worksheet_name { # my $table = shift; # my $dbh = shift; # # my $sql = 'SELECT tab_name FROM gsheet_tracked_table WHERE table_name = \'' . $table . '\';'; # print "$sql \n"; # my $sth = $dbh->prepare($sql); # my $xs = $sth->execute(); # my $worksheet_name; # while (my @row = $sth->fetchrow_array) { # $worksheet_name = $row[0]; # } # # return $worksheet_name; #} sub check_for_gsheet_tracked_column { my $dbh = EMig::db_connect(); my $sth = $dbh->prepare(" SELECT EXISTS( SELECT 1 FROM information_schema.tables WHERE table_schema = " . $dbh->quote( $MIGSCHEMA ) . " AND table_name = 'gsheet_tracked_column' );" ); my $rv = $sth->execute() || die "Error checking for table (gsheet_tracked_column): $!"; my @cols = $sth->fetchrow_array; $sth->finish; EMig::db_disconnect($dbh); return $cols[0]; } sub insert_row { my ($schema, $table, $dbh, $headers_ref, $row_ref) = @_; my @headers = @{ $headers_ref }; my @row_data = @{ $row_ref }; my $header_string = '(' . join(",", @headers) . ')'; map {s/\'/\'\'/g; } @row_data; my $row_string = '(' . join(",", map {qq/'$_'/} @row_data) . ')'; #print "INSERT INTO $schema.$table $header_string VALUES $row_string\n"; $dbh->do(qq/ INSERT INTO $schema.$table $header_string VALUES $row_string ; /); } sub timestamp { my ($table, $dbh, $action) = @_; my $column; if ($action eq 'pull') { $column = 'last_pulled' } else { $column = 'last_pushed' }; $dbh->do(qq/ UPDATE gsheet_tracked_table SET $column = NOW() WHERE table_name = '$table'; /); } sub truncate_table { my $table = shift; my $dbh = shift; $dbh->do(qq/ TRUNCATE TABLE $table;; /); print "Table $table truncated.\n"; } sub abort { my $msg = shift; print STDERR "$0: $msg", "\n"; exit 1; } sub connect_gsheet { my ($clientid,$clientsecret,$sessionfile) = @_; my $oauth2 = Net::Google::DataAPI::Auth::OAuth2->new( client_id => $clientid, client_secret => $clientsecret, scope => ['http://spreadsheets.google.com/feeds/'], redirect_uri => 'https://developers.google.com/oauthplayground', ); if ($sessionfile =~ m/~/) {$sessionfile =~ s/~/$ENV{HOME}/; } my $session = retrieve($sessionfile); my $restored_token = Net::OAuth2::AccessToken->session_thaw( $session, auto_refresh => 1, profile => $oauth2->oauth2_webserver, ); $oauth2->access_token($restored_token); my $service = Net::Google::Spreadsheets->new(auth => $oauth2); my $spreadsheet = $service->spreadsheet( { title => $MIGSCHEMA } ); return $spreadsheet; }