Class::DBIでCoro::Mysqlを使う

最近ちょっとCoroを触っていて、Class::DBIでCoro::Mysql使いたいなと思った。

■The architecture of Coro::MySQL によると Class::DBI とかそのまま使えるとあるけど、どうやるとそのまま使えるかわからなかったのでいろいろ調べたりCoro::Mysqlのソース読んだりした。

結局どうやるとそのままCDBI使えるかわからなかった。単に Class::DBI->db_Main が返してくるDBハンドルを Coro::Mysql::unblock してやるだけではうまく動かない。手元の環境ではinsertとかはできるけどなんでか検索系が変な動きをした。あと、この手法でやる場合、Class::DBIではクラス変数としてdbhを持ってるので、結局プロセス内で1スレッドしかDBアクセスできないことになり、ちょっと僕のやりたい事とずれる。僕はスレッドごとに非同期にDBに読み書きできるようにしたい。1接続だけってのはちょっと非同期実行のうまみが少なくて切ない。接続数の上限はなにがしか管理するにせよ。

要件としては

  • Coroスレッドごとに必要に応じてdbhを持ちたい
  • 既存のソースを使いたいので、Class::DBIは使いたい

そんで、Class::DBIに複数接続を持たせるにはどうしたらいいか調べたら
Class::DBIで複数データベースを扱う+register_cleanup
Class::DBI::Plugin::MultiDatabases
というのが見つかった。
そしてClass::DBIのWikiの"Using multiple databases"によれば、db_Mainを上書きしてやるとよいみたいだ。ただし db_Main を上書きした場合、dbi_commit と dbi_rollback がちゃんと動かなくなるので、適切なdbhからcommit/rollbackするようにしてやらないといけないらしい。

というわけでそういうのを書いてみた。

package Class::DBI::Coro::Mysql;
use strict;
use warnings; 
use base 'Class::DBI::mysql';

use DBI;
use Coro;
use Coro::Mysql;
use Data::Dumper;

__PACKAGE__->mk_classdata('_coro_dsn_info');
__PACKAGE__->mk_classdata('_coro_dbh_hash');

sub connection {
    my ($class, $dsn, $user, $pass, $attr) = @_;
    $class->_coro_dsn_info([$dsn, $user, $pass, $attr]);
    $class->_coro_dbh_hash({});
}

sub db_Main {
    my $class = shift;
    my $me = $Coro::current;

    if( my $dbh = $class->_coro_dbh_hash->{$me} ){
        return $dbh if $dbh->FETCH('Active');
        $dbh->disconnect;
    }

    my $dbh = Coro::Mysql::unblock(DBI->connect(@{ $class->_coro_dsn_info }));
    $me->on_destroy(sub {
            if( exists $class->_coro_dbh_hash->{$me} ){
                $class->_coro_dbh_hash->{$me}->disconnect;
                delete $class->_coro_dbh_hash->{$me};
            }
    });
    $class->_coro_dbh_hash->{$me} = $dbh;
}

sub dbi_commit {
    my $class = shift;
    $class->db_Main->commit(@_);
}   
    
sub dbi_rollback {
    my $class = shift;
    $class->db_Main->rollback(@_);
}   
    
1; 

connection までも乗っとってしまうのはどうなんだろうとも思ったけど、結局 db_Main を乗っとるた時点で Ima::DBIが作った db_Main はアクセスされないんだから、普通の Class::DBI と同じように使えるし、connection も上書きしてしまった。
あと、なんかきちんと $dbh->disconnect を呼ばずに(Perlに回収されると?)セグメンテーションフォルトする。しょうがないので、Coro::on_destroy に disconnect を登録した。(id:malaさんが書いてた、なんかunpatchのところで死ぬ。perlのバージョンが古いからかも。と同じ問題?)
使い方は普通の Class::DBI::mysql と同じである。

use strict;

package TestDB;
#普通の Class::DBI::mysql を継承するとブロックする
#use base qw/Class::DBI::mysql/;
use base qw/Class::DBI::Coro::Mysql/;

package TestDB::Sandbox;
use base qw/TestDB/;

__PACKAGE__->table('sandbox');
__PACKAGE__->columns(All=>qw/id sand created_on updated_on/);
__PACKAGE__->set_sql("sleep" => qq{ SELECT sleep(?) }); 

package Main;

use Coro;
use Data::Dumper;

TestDB->connection(
    "dbi:mysql:ほげほげほげ",
    "ほげほげほげ",
    "ほげほげほげ",
    {AutoCommit=>0}
);

async_pool {
    print "sleep\n";
    TestDB::Sandbox->search_sleep(5);
    print "wake up\n";
    $Coro::main->ready;
};

async_pool {
    eval {
        print "insert\n";
        my $s = TestDB::Sandbox->create({});
        $s->set(sand=>"hogehoge");
        $s->update;
        $s->dbi_commit;
        printf("inserted [%s]\n", $s->id);
    };  
    if($@){
        warn $@; 
        TestDB->dbi_rollback;
    }   
};

schedule;
print "end\n";

非同期実行の場合は select sleep(5) でブロックせずに、

sleep
insert
inserted [\d+]
wake up
end

みたいになる。