No changes between revisions
/asterisk-activecalls/trunk/POE/Component/Client/Asterisk/.exists |
--- POE/Component/Client/Asterisk/Manager.pm (nonexistent) |
+++ POE/Component/Client/Asterisk/Manager.pm (revision 1372) |
@@ -0,0 +1,485 @@ |
+package POE::Component::Client::Asterisk::Manager; |
+ |
+###################################################################### |
+### POE::Component::Client::Asterisk::Manager |
+### David Davis (xantus@cpan.org) |
+### |
+### Copyright (c) 2003-2005 David Davis and Teknikill. All Rights |
+### Reserved. This module is free software; you can redistribute it |
+### and/or modify it under the same terms as Perl itself. |
+###################################################################### |
+ |
+use strict; |
+use warnings; |
+ |
+our $VERSION = '0.08'; |
+ |
+use Carp qw(croak); |
+use POE qw( Component::Client::TCP ); |
+use Digest::MD5; |
+ |
+sub DEBUG { 0 } |
+ |
+sub new { |
+ my $package = shift; |
+ croak "$package requires an even number of parameters" if @_ % 2; |
+ my %params = @_; |
+ my $alias = $params{'Alias'}; |
+ |
+ $alias = 'asterisk_client' unless defined($alias) and length($alias); |
+ |
+ my $listen_port = $params{listen_port} || 5038; |
+ |
+ POE::Session->create( |
+ #options => {trace=>1}, |
+ args => [ %params ], |
+ package_states => [ |
+ 'POE::Component::Client::Asterisk::Manager' => { |
+ _start => '_start', |
+ _stop => '_stop', |
+ signals => 'signals', |
+ } |
+ ], |
+ inline_states => $params{inline_states}, |
+# { |
+# _default => sub { |
+# print STDERR "$_[STATE] called\n"; |
+# }, |
+# }, |
+ ); |
+ |
+ return 1; |
+} |
+ |
+sub _start { |
+ my ($kernel, $session, $heap) = @_[KERNEL, SESSION, HEAP]; |
+ my %params = splice(@_,ARG0); |
+ |
+ if (ref($params{Options}) eq 'HASH') { |
+ $session->option( %{$params{Options}} ); |
+ } |
+ |
+ $params{reconnect_time} = $params{reconnect_time} || 5; |
+ $params{requeue_posts} = $params{requeue_posts} || undef; |
+ |
+ $kernel->alias_set($params{Alias}); |
+ |
+ # watch for SIGINT |
+# $kernel->sig('INT', 'signals'); |
+ |
+ $heap->{client} = POE::Component::Client::TCP->new( |
+ RemoteAddress => $params{RemoteHost}, |
+ RemotePort => $params{RemotePort}, |
+ # no longer a seperate package - see below |
+ Filter => "POE::Filter::Asterisk::Manager", |
+ Alias => "$params{Alias}_client", |
+ Args => [ \%params ], |
+ Started => sub { |
+ $_[HEAP]->{params} = $_[ARG0]; |
+ }, |
+ Disconnected => sub { |
+ $_[KERNEL]->delay(reconnect => $_[HEAP]->{params}->{reconnect_time}); |
+ }, |
+ Connected => sub { |
+ my $heap = $_[HEAP]; |
+ DEBUG && print STDERR sprintf("connected to %s:%s\n",$heap->{params}->{RemoteHost},$heap->{params}->{RemotePort}); |
+ $heap->{_connected} = 0; |
+ $heap->{_logged_in} = 0; |
+ $heap->{_auth_stage} = 0; |
+ $_[KERNEL]->delay( recv_timeout => 5 ); |
+ |
+ if ($heap->{params}->{Astmanproxy}) { |
+ # For astmanproxy? Don't wait for a response |
+ $heap->{_connected} = 1; |
+ $kernel->call($_[SESSION] => login => splice(@_,ARG0)); |
+ } |
+ }, |
+ ConnectError => sub { |
+ my $heap = $_[HEAP]; |
+ DEBUG && print STDERR sprintf("could not connect to %s:%s, reconnecting in %s seconds...\n" |
+ ,$heap->{params}->{RemoteHost},$heap->{params}->{RemotePort},$heap->{params}->{reconnect_time}); |
+ $_[KERNEL]->delay(reconnect => $heap->{params}->{reconnect_time}); |
+ }, |
+ ServerInput => sub { |
+ my ( $kernel, $heap, $input ) = @_[KERNEL, HEAP, ARG0]; |
+ |
+ DEBUG && do { |
+ require Data::Dumper; |
+ print Data::Dumper->Dump([$input],['input']); |
+ }; |
+ |
+ if ($heap->{_logged_in} == 0 && $heap->{_connected} == 0) { |
+ $kernel->delay( recv_timeout => 5 ); |
+ if ($input->{acm_version}) { |
+ $heap->{_version} = $input->{acm_version}; |
+ $heap->{_connected} = 1; |
+ $kernel->call($_[SESSION] => login => splice(@_,ARG0)); |
+ } else { |
+ print STDERR "Invalid Protocol (wrong port?)\n"; |
+ $kernel->yield("shutdown"); |
+ } |
+ } elsif ($heap->{_connected} == 1 && $heap->{_logged_in} == 0) { |
+ $kernel->call($_[SESSION] => login => splice(@_,ARG0)); |
+ } elsif ($heap->{_logged_in} == 1) { |
+ $kernel->call($_[SESSION] => callbacks => splice(@_,ARG0)); |
+ } |
+ }, |
+ |
+ InlineStates => { |
+ _put => sub { |
+ my $heap = $_[HEAP]; |
+ if ($heap->{server}) { |
+ $heap->{server}->put($_[ARG0]); |
+ } else { |
+ if ($heap->{requeue_posts}) { |
+ push(@{$heap->{queued}},$_[ARG0]); |
+ } else { |
+ print STDERR "cannot send when not connected! -ignored-\n"; |
+ } |
+ } |
+ }, |
+ login_complete => sub { |
+ my ( $kernel, $heap ) = @_[KERNEL, HEAP]; |
+ DEBUG && print STDERR "logged in and ready to process events\n"; |
+ # call the _connected state |
+ $kernel->yield("_connected" => splice(@_,ARG0)); |
+ }, |
+ recv_timeout => sub { |
+ my ( $kernel, $heap ) = @_[KERNEL, HEAP]; |
+ unless ($heap->{_connected} == 1) { |
+ print STDERR "Timeout waiting for response\n"; |
+ $heap->{_connected} = 0; |
+ $heap->{_logged_in} = 0; |
+ $kernel->yield("shutdown"); |
+ } |
+ }, |
+ login => sub { |
+ my ($kernel, $heap, $input) = @_[KERNEL, HEAP, ARG0]; |
+ if ($heap->{_logged_in} == 1) { |
+ # shouldn't get here |
+ DEBUG && print STDERR "Login called when already logged in\n"; |
+ #$kernel->yield(callbacks => splice(@_,ARG0)); |
+ return; |
+ } |
+ if ($heap->{_auth_stage} == 0) { |
+ $heap->{server}->put({'Action' => 'Challenge', 'AuthType' => 'MD5'}); |
+ $heap->{_auth_stage} = 1; |
+ } elsif ($heap->{_auth_stage} == 1) { |
+ unless ($input->{Response} && lc($input->{Response}) eq 'success') { |
+ print STDERR "AuthType MD5 may not be supported\n"; |
+ $kernel->yield("shutdown"); |
+ return; |
+ } |
+ if ($input->{Challenge}) { |
+ if (! defined $heap->{params}->{Password}) { |
+ print STDERR "No password provided\n"; |
+ $kernel->yield("shutdown"); |
+ return; |
+ } |
+ |
+ my $digest = Digest::MD5::md5_hex("$input->{Challenge}$heap->{params}->{Password}"); |
+ $heap->{server}->put({'Action' => 'Login', 'AuthType' => 'MD5', 'Username' => $heap->{params}->{Username}, 'Key' => $digest }); |
+ $heap->{_auth_stage} = 2; |
+ } |
+ } elsif ($heap->{_auth_stage} == 2) { |
+ if ($input->{Message} && lc($input->{Message}) eq 'authentication accepted') { |
+ delete $heap->{_auth_stage}; |
+ $heap->{_logged_in} = 1; |
+ # I remembered inline_states not working (above), so i put this in |
+ foreach my $k (keys %{$heap->{params}->{inline_states}}) { |
+ $kernel->state($k => $heap->{params}->{inline_states}{$k}); |
+ } |
+ if (ref($heap->{queued}) eq 'ARRAY') { |
+ foreach my $a (@{$heap->{queued}}) { |
+ $heap->{server}->put($a); |
+ } |
+ delete $heap->{queued}; |
+ } |
+ $kernel->yield(login_complete => splice(@_,ARG0)); |
+ } elsif ($input->{Message} && lc($input->{Message}) eq 'authentication failed') { |
+ print STDERR "Authentication failed.\n"; |
+ $heap->{_connected} = 0; |
+ $heap->{_logged_in} = 0; |
+ $kernel->yield("shutdown"); |
+ } |
+ } |
+ }, |
+ callbacks => sub { |
+ my ($kernel, $heap, $session, $input) = @_[KERNEL, HEAP, SESSION, ARG0]; |
+ # TODO this stuff needs some work |
+ next unless (ref($input)); |
+ my $qual = 0; |
+ foreach my $k (keys %{$heap->{params}->{CallBacks}}) { |
+ my $match = 0; |
+ if (ref($heap->{params}->{CallBacks}{$k}) eq 'HASH') { |
+ foreach my $c (keys %{$heap->{params}->{CallBacks}{$k}}) { |
+ last if ($match == 1); |
+ if (exists($input->{$c}) && $heap->{params}->{CallBacks}{$k}{$c} eq $input->{$c}) { |
+ $match = 2; |
+ $qual++; |
+ } else { |
+ $match = 1; |
+ } |
+ } |
+ # matched ALL of the callback (not 2 of them like it looks like) |
+ if ($match == 2) { |
+ # callback good |
+ DEBUG && print STDERR "callback $k is good\n"; |
+ $kernel->yield($k => $input); |
+ } |
+ } elsif ($heap->{params}->{CallBacks}{$k} eq ':all' || $heap->{params}->{CallBacks}{$k} eq 'default') { |
+ $kernel->yield($k => $input); |
+ } else { |
+ print STDERR "Incorrectly written callback $k\n"; |
+ } |
+ } |
+ # use the :all qualifier now |
+ #if ($qual == 0) { |
+ # $kernel->yield("default" => splice(@_,ARG0)); |
+ #} |
+ }, |
+ }, |
+ ); |
+ DEBUG && print STDERR "Client started.\n"; |
+} |
+ |
+sub _stop { |
+ $_[KERNEL]->yield("shutdown"); |
+ DEBUG && print STDERR "Client stopped.\n"; |
+} |
+ |
+# Handle incoming signals (INT) |
+ |
+# TODO disconnect gracefully |
+sub signals { |
+ my $signal_name = $_[ARG0]; |
+ |
+# DEBUG && print STDERR "Client caught SIG$signal_name\n"; |
+ |
+ # do not handle the signal |
+ return 0; |
+} |
+ |
+ |
+1; |
+ |
+package POE::Filter::Asterisk::Manager; |
+ |
+use strict; |
+use Carp qw(croak); |
+ |
+sub DEBUG { 0 }; |
+ |
+sub new { |
+ my $type = shift; |
+ my $self = { |
+ buffer => '', |
+ crlf => "\x0D\x0A", |
+ }; |
+ bless $self, $type; |
+ $self; |
+} |
+ |
+sub get { |
+ my ($self, $stream) = @_; |
+ |
+ # Accumulate data in a framing buffer. |
+ $self->{buffer} .= join('', @$stream); |
+ |
+ my $many = []; |
+ while (1) { |
+ my $input = $self->get_one([]); |
+ if ($input) { |
+ push(@$many,@$input); |
+ } else { |
+ last; |
+ } |
+ } |
+ |
+ return $many; |
+} |
+ |
+sub get_one_start { |
+ my ($self, $stream) = @_; |
+ |
+ DEBUG && do { |
+ my $temp = join '', @$stream; |
+ $temp = unpack 'H*', $temp; |
+ warn "got some raw data: $temp\n"; |
+ }; |
+ |
+ # Accumulate data in a framing buffer. |
+ $self->{buffer} .= join('', @$stream); |
+} |
+ |
+sub get_one { |
+ my $self = shift; |
+ |
+ return [] if ($self->{finish}); |
+ |
+ |
+ if ($self->{buffer} =~ s#^(?:Asterisk|Aefirion) Call Manager(?: Proxy)?/(\d+\.\d+\w*)$self->{crlf}##is) { |
+ return [{ acm_version => $1 }]; |
+ } |
+ |
+ return [] unless ($self->{crlf}); |
+ my $crlf = $self->{crlf}; |
+ |
+ # collect lines in buffer until we find a double line |
+ return [] unless($self->{buffer} =~ m/${crlf}${crlf}/s); |
+ |
+ |
+ $self->{buffer} =~ s/(^.*?)(${crlf}${crlf})//s; |
+ |
+ my $buf = "$1${crlf}"; |
+ |
+ my $kv = {}; |
+ |
+ foreach my $line (split(/(:?${crlf})/,$buf)) { |
+ my $tmp = $line; |
+ $tmp =~ s/\r|\n//g; |
+ next unless($tmp); |
+ |
+ if ($line =~ m/([\w\-]+)\s*:\s+(.*)/) { |
+ my $key = $1; |
+ my $val = $2; |
+ DEBUG && print "recv key $key: $val\n"; |
+ |
+ if ($key eq 'Variable',) { |
+ for my $v( split /\r/, $val ) { |
+ $v =~ s/^Variable\:\s*//; |
+ my @parts = split /=/, $v; |
+ $kv->{$key}{$parts[0]} = $parts[1]; |
+ DEBUG && print " recv variable: $parts[0] => $parts[1]\n"; |
+ } |
+ } else { |
+ $kv->{$key} = $val; |
+ } |
+ } else { |
+ $kv->{content} .= "$line"; |
+ } |
+ } |
+ |
+ return (keys %$kv) ? [$kv] : []; |
+} |
+ |
+sub put { |
+ my ($self, $hrefs) = @_; |
+ my @raw; |
+ for my $i ( 0 .. $#{$hrefs} ) { |
+ if (ref($hrefs->[$i]) eq 'HASH') { |
+ foreach my $k (keys %{$hrefs->[$i]}) { |
+ DEBUG && print "send key $k: $hrefs->[$i]{$k}\n"; |
+ push(@raw,"$k: $hrefs->[$i]{$k}$self->{crlf}"); |
+ } |
+ } elsif (ref($hrefs->[$i]) eq 'ARRAY') { |
+ push(@raw, join("$self->{crlf}", @{$hrefs->[$i]}, "")); |
+ } elsif (ref($hrefs->[$i]) eq 'SCALAR') { |
+ push(@raw, $hrefs->[$i]); |
+ } else { |
+ croak "unknown type ".ref($hrefs->[$i])." passed to ".__PACKAGE__."->put()"; |
+ } |
+ push(@raw,"$self->{crlf}"); |
+ } |
+ \@raw; |
+} |
+ |
+sub get_pending { |
+ my $self = shift; |
+ return [ $self->{buffer} ] if length $self->{buffer}; |
+ return undef; |
+} |
+ |
+1; |
+ |
+__END__ |
+ |
+=head1 NAME |
+ |
+POE::Component::Client::Asterisk::Manager - Event-based Asterisk / Aefirion Manager Client |
+ |
+=head1 SYNOPSIS |
+ |
+ use POE qw( Component::Client::Asterisk::Manager ); |
+ |
+ POE::Component::Client::Asterisk::Manager->new( |
+ Alias => 'monitor', |
+ RemoteHost => 'localhost', |
+ RemotePort => 5038, # default port |
+ CallBacks => { |
+ input => ':all', # catchall for all manager events |
+ ring => { |
+ 'Event' => 'Newchannel', |
+ 'State' => 'Ring', |
+ }, |
+ }, |
+ inline_states => { |
+ input => sub { |
+ my $input = $_[ARG0]; |
+ # good for figuring out what manager events look like |
+ require Data::Dumper; |
+ print Data::Dumper->Dump([$input]); |
+ }, |
+ ring => sub { |
+ my $input = $_[ARG0]; |
+ # $input is a hash ref with info from |
+ print STDERR "RING on channel $input->{Channel}\n"; |
+ }, |
+ }, |
+ ); |
+ |
+ $poe_kernel->run(); |
+ |
+=head1 DESCRIPTION |
+ |
+POE::Component::Client::Asterisk::Manager is an event driven Asterisk manager |
+client. This module should also work with Aefirion. |
+ |
+=head1 METHODS |
+ |
+=head2 new() |
+ |
+This method creates a POE::Component::Client::TCP session and works inside |
+that session. You can specify the alias, host, port and inline_states. |
+See the synopsis for an example. |
+ |
+=head1 CALLBACKS |
+ |
+Callbacks are events that meet a criteria specified in a hash ref |
+ |
+For example: |
+ |
+ ring => { |
+ 'Event' => 'Newchannel', |
+ 'State' => 'Ring', |
+ }, |
+ |
+The event 'ring' will be called with a hash href in ARG0 when the component |
+receives a manager event matching 'Newchannel' and manager state 'Ring'. |
+ |
+You can specify a catch all event like this: |
+ |
+ catch_all => ':all' |
+ |
+Note: This was changed from 'default' to ':all' in an effort to make it |
+more clear. 'default' will also work. |
+ |
+=head1 BUGS |
+ |
+Please report them via RT: |
+L<http://rt.cpan.org/NoAuth/ReportBug.html?Queue=POE%3A%3AComponent%3A%3AClient%3A%3AAsterisk%3A%3AManager> |
+ |
+=head1 EXAMPLES |
+ |
+There are a few examples in the examples directory that can get you going. |
+ |
+=head1 AUTHOR |
+ |
+David Davis, E<lt>xantus@cpan.orgE<gt> |
+ |
+=head1 SEE ALSO |
+ |
+perl(1), POE::Filter::Asterisk::Manager, L<http://aefirion.org/>, L<http://asterisk.org/>, |
+L<http://teknikill.net/> |
+ |
+=cut |