SQLの字句解析器(tokenizer)を実装する

ZOZO Advent Calendar 2024 11日目の記事になります。
今回はSQLの字句解析器(tokenizer)の実装を行ってみたいと思います。

はじめに

例えばSQL内で参照されているテーブル名を抜き出したい場合があるとします。簡単に実装するのであれば正規表現で抜き出す方法があると思います。しかしBigQueryを例に挙げると、FROM句の後にサブクエリやプロジェクト名を省略した形でSELECTするテーブルを書くことができ、すべての条件を網羅しようとすると正規表現が複雑になります。

cloud.google.com

今回SQLの字句解析器(tokenizer)を実装し、SQL内の単語をtokenへ分割してFROM句のテーブル名を抜き出せるようにしたいと思います。

使用環境

Python 3.13.1

tokenの分割について

字句解析器を実装する前にtoken化した単語をどこまで切り分けるのか決めます。利用用途によって切り分け方を変えると良いのですが、今回はFROM句後のテーブル名を抜き出す一例として切り分け方を紹介したいと思います。今回の実装では下記表のような形で単語とtokenの種類で抜き出します。

抜き出す単語 token名
--, #, /* , */ Comment
空白, 改行 NewlineAndWhitespace
FROM, JOIN FromORJoin
, Comma
その他の単語 Something

実装

字句解析を行うTokenizerクラスを下記の様に実装しました。

import re

class Tokenizer:
    def __init__(self):
        self._SQL_REGEX = [
            # Comment
            (r'(--|#).*?(\r\n|\r|\n|$)', "Comment"),
            (r'(?s)/\*.*?\*/', "Comment"),
            # Newline and Whitespace
            (r'(\r\n|\r|\n|\s+?)', "NewlineAndWhitespace"),
            # FROM and JOIN
            (r'(?i)(JOIN|FROM)\b', "FromORJoin"),
            # Comma
            (r',', "Comma"),
            # Other
            (r'(.+?)(?=\s|\n|$)', "Something"),         
        ]

    def _match_token(self, sql, index):
        for pattern, token in self._SQL_REGEX:
            match_token_pattern = re.compile(pattern).match(sql, index)
            if match_token_pattern:
                return token, match_token_pattern
        raise RuntimeError(f"Did not match any token pattern: '{sql}'")

    def lexer(self, sql):
        match_tokens = list()
        index = 0
        sql_length = len(sql)
        while index < sql_length:
            token_type, match_sql_text = self._match_token(sql, index)
            match_tokens.append({"token_type": token_type, "text": match_sql_text.group()})
            index += match_sql_text.end() - index
        return match_tokens

クラスの利用イメージは下記コードのようになります。SQLの文字列をlexer関数に渡すとtokenの種類と対応する文字列がlistで返されます。

def main():
    tokenizer = Tokenizer()
    sql = "SELECT * FROM table1 JOIN table2 ON table1.id = table2.id WHERE table1.id = 1"
    tokens = tokenizer.lexer(sql)
    for token in tokens:
        print(token)


if __name__ == "__main__":
    main()

上記コードを実行した結果になります。

$ python tokenizer.py
{'token_type': 'NewlineAndWhitespace', 'text': '\n'}
{'token_type': 'NewlineAndWhitespace', 'text': ' '}
{'token_type': 'NewlineAndWhitespace', 'text': ' '}
{'token_type': 'NewlineAndWhitespace', 'text': ' '}
{'token_type': 'NewlineAndWhitespace', 'text': ' '}
{'token_type': 'Comment', 'text': '-- テスト\n'}
{'token_type': 'NewlineAndWhitespace', 'text': ' '}
{'token_type': 'NewlineAndWhitespace', 'text': ' '}
{'token_type': 'NewlineAndWhitespace', 'text': ' '}
{'token_type': 'NewlineAndWhitespace', 'text': ' '}
{'token_type': 'Something', 'text': 'SELECT'}
{'token_type': 'NewlineAndWhitespace', 'text': ' '}
{'token_type': 'Something', 'text': '*'}
{'token_type': 'NewlineAndWhitespace', 'text': ' '}
{'token_type': 'FromORJoin', 'text': 'FROM'}
{'token_type': 'NewlineAndWhitespace', 'text': ' '}
{'token_type': 'Something', 'text': 'table1'}
{'token_type': 'NewlineAndWhitespace', 'text': ' '}
{'token_type': 'FromORJoin', 'text': 'JOIN'}
{'token_type': 'NewlineAndWhitespace', 'text': ' '}
{'token_type': 'Something', 'text': 'table2'}
{'token_type': 'NewlineAndWhitespace', 'text': ' '}
{'token_type': 'Something', 'text': 'WHERE'}
{'token_type': 'NewlineAndWhitespace', 'text': ' '}
{'token_type': 'Something', 'text': 'column1'}
{'token_type': 'NewlineAndWhitespace', 'text': ' '}
{'token_type': 'Something', 'text': '='}
{'token_type': 'NewlineAndWhitespace', 'text': ' '}
{'token_type': 'Something', 'text': "'value1'"}
{'token_type': 'NewlineAndWhitespace', 'text': '\n'}
{'token_type': 'NewlineAndWhitespace', 'text': ' '}
{'token_type': 'NewlineAndWhitespace', 'text': ' '}
{'token_type': 'NewlineAndWhitespace', 'text': ' '}
{'token_type': 'NewlineAndWhitespace', 'text': ' '}

それぞれのtokenに対応した文字列を抜き出せることが確認できました。

次は抜き出したtokenの種類をもとにテーブル名を抜き出します。 先ほど作成したTokenizerクラスの出力ではNewlineAndWhitespaceCommentCommaのtokenを出力してましたが、テーブル名を抜き出すだけの場合は必要ないので出力しないようにします。

import re

class Tokenizer:
    def __init__(self):
        self._SQL_REGEX = [
            # Comment
            (r'(--|#).*?(\r\n|\r|\n|$)', "Comment"),
            (r'(?s)/\*.*?\*/', "Comment"),
            # Newline and Whitespace
            (r'(\r\n|\r|\n|\s+?)', "NewlineAndWhitespace"),
            # FROM and JOIN
            (r'(?i)(JOIN|FROM)\b', "FromORJoin"),
            # Comma
            (r',', "Comma"),
            # Other
            (r'(.+?)(?=\s|\n|$)', "Something"),         
        ]

    def _match_token(self, sql, index):
        for pattern, token in self._SQL_REGEX:
            match_token_pattern = re.compile(pattern).match(sql, index)
            if match_token_pattern:
                return token, match_token_pattern
        raise RuntimeError(f"Did not match any token pattern: '{sql}'")

    def lexer(self, sql):
        match_tokens = list()
        index = 0
        sql_length = len(sql)
        while index < sql_length:
            token_type, match_sql_text = self._match_token(sql, index)
            # FROM句後のテーブル名を抜き出すため、NewlineAndWhitespace, Comment, Commaは無視する
            if token_type != "NewlineAndWhitespace" and token_type != "Comment" and token_type != "Comma":
              match_tokens.append({"token_type": token_type, "text": match_sql_text.group()})
            index += match_sql_text.end() - index
        return match_tokens



def main():
    tokenizer = Tokenizer()
    sql = """
    -- テスト
    SELECT * FROM table1 JOIN table2 WHERE column1 = 'value1'
    """
    tokens = tokenizer.lexer(sql)
    table_ids = list()
    index = 0
    tokens_length = len(tokens)
    while index < tokens_length:
      token_type = tokens[index]["token_type"]
      # FROMかJOINの次の単語を抜き出す
      if token_type == "FromORJoin":
        table_ids.append(tokens[index + 1]["text"])
        index += 2
      else:
        index += 1
    print(table_ids)
if __name__ == "__main__":
    main()

出力結果は下記になります。SQLの文字列に含まれたテーブル名が抜き出せていることが確認できました。

$ python tokenizer.py
['table1', 'table2']

まとめ

字句解析を実装して、うまくテーブル名を抜き出せました。 ただサブクエリやextract関数で利用されるFROMについて考慮する場合はもう少し抜き出し方を工夫する必要があります。 tokenの種類を変更することでテーブル名を抜き出す以外にも応用ができそうなので色々利用用途を模索するのも面白そうかなと思いました。