Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Does it work with PostgreSQL INSERT statement dumps? #12

Open
insinfo opened this issue Oct 3, 2024 · 1 comment
Open

Does it work with PostgreSQL INSERT statement dumps? #12

insinfo opened this issue Oct 3, 2024 · 1 comment

Comments

@insinfo
Copy link

insinfo commented Oct 3, 2024

I'm trying to read a POSTGRESQL dump file with INSERTs statements and convert it to COPY statements
I would like to know if this package could help me with this?

#!/usr/bin/env python3
import sys
import sqlparse
from sqlparse.sql import Identifier, Function, Values
from sqlparse.tokens import DML, Keyword

def main():
    if len(sys.argv) != 3:
        print('Uso: python pg_dump_converter.py entrada.sql saida.sql')
        return

    input_file_path = sys.argv[1]
    output_file_path = sys.argv[2]

    try:
        with open(input_file_path, 'r', encoding='utf-8') as input_file, \
             open(output_file_path, 'w', encoding='utf-8') as output_file:
            process_file(input_file, output_file)
    except FileNotFoundError:
        print('O arquivo de entrada não existe.')
        return

def process_file(input_file, output_file):
    statements = sqlparse.parsestream(input_file)
    current_table = None
    columns = None
    values_list = []
    for statement in statements:
        print(f'statement: {statement}')
        if statement.get_type() == 'INSERT':
            table_name, cols, values = process_insert_statement(statement)
            if current_table != table_name or columns != cols:
                # Escreve os dados acumulados anteriores
                if current_table and values_list:
                    write_copy_statement(output_file, current_table, columns, values_list)
                    values_list = []
                current_table = table_name
                columns = cols
            values_list.extend(values)
        else:
            # Escreve os dados acumulados antes de processar outras instruções
            if current_table and values_list:
                write_copy_statement(output_file, current_table, columns, values_list)
                values_list = []
                current_table = None
                columns = None
            output_file.write(str(statement).strip() + '\n')

    # Escreve quaisquer dados restantes
    if current_table and values_list:
        write_copy_statement(output_file, current_table, columns, values_list)

def process_insert_statement(statement):
    tokens = statement.tokens
    table_name = ''
    columns = []
    values = []
    idx = 0
    while idx < len(tokens):
        token = tokens[idx]
        if token.ttype is DML and token.value.upper() == 'INSERT':
            idx += 1  # Avança para o próximo token
            continue
        elif isinstance(token, Identifier):
            table_name = token.get_name()
        elif token.ttype is Keyword and token.value.upper() == 'VALUES':
            # Coleta os valores
            if idx + 1 < len(tokens) and isinstance(tokens[idx+1], Values):
                values.extend(parse_values(tokens[idx+1]))
                idx += 1  # Pula o token 'Values'
        elif isinstance(token, sqlparse.sql.Parenthesis):
            # Pode ser a lista de colunas ou valores
            if not columns:
                # Assume que é a lista de colunas
                columns = [str(id).strip('"') for id in token.get_identifiers()]
            else:
                # Valores adicionais
                values.extend(parse_values(token))
        idx += 1
    return table_name, columns, values

def parse_values(token):
    values = []
    if isinstance(token, Values):
        for parenthesis in token.get_sublists():
            values.append(parse_value_list(parenthesis))
    elif isinstance(token, sqlparse.sql.Parenthesis):
        values.append(parse_value_list(token))
    return values

def parse_value_list(parenthesis):
    value_list = []
    for token in parenthesis.tokens:
        if isinstance(token, sqlparse.sql.IdentifierList):
            for id in token.get_identifiers():
                value_list.append(process_value(id))
        elif not token.is_whitespace and token.ttype != sqlparse.tokens.Punctuation:
            value_list.append(process_value(token))
    return value_list

def process_value(token):
    value = token.value
    value = value.strip()
    if value.upper() == 'NULL':
        return '\\N'
    elif value.startswith("'") and value.endswith("'"):
        # Remove as aspas e trata caracteres especiais
        value = value[1:-1].replace("''", "'")
        value = value.replace("\\", "\\\\").replace("\n", "\\n").replace("\r", "\\r")
        return value
    else:
        return value

def write_copy_statement(output_file, table_name, columns, values_list):
    output_file.write(f'COPY {table_name} ({", ".join(columns)}) FROM stdin;\n')
    for values in values_list:
        values_line = '\t'.join(values)
        output_file.write(values_line + '\n')
    output_file.write('\\.\n')

if __name__ == '__main__':
    try:
        import sqlparse
    except ImportError:
        print('A biblioteca sqlparse é necessária. Instale-a usando "pip install sqlparse".')
    else:
        main()
@gmr
Copy link
Owner

gmr commented Nov 4, 2024

If they're in custom format, it should work (ie pg_dump -Fc)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants