Subversion Repositories general

Compare Revisions

No changes between revisions

Ignore whitespace Rev 1371 → Rev 1372

/asterisk-activecalls/POE/Component/Client/Asterisk/Manager.pm
File deleted
/asterisk-activecalls/trunk/POE/Component/Client/Asterisk/.exists
--- trunk/POE/Component/Client/Asterisk/Manager.pm (nonexistent)
+++ trunk/POE/Component/Client/Asterisk/Manager.pm (revision 1372)
@@ -0,0 +1,485 @@
+package POE::Component::Client::Asterisk::Manager;
+
+######################################################################
+### POE::Component::Client::Asterisk::Manager
+### David Davis (xantus@cpan.org)
+###
+### Copyright (c) 2003-2005 David Davis and Teknikill. All Rights
+### Reserved. This module is free software; you can redistribute it
+### and/or modify it under the same terms as Perl itself.
+######################################################################
+
+use strict;
+use warnings;
+
+our $VERSION = '0.08';
+
+use Carp qw(croak);
+use POE qw( Component::Client::TCP );
+use Digest::MD5;
+
+sub DEBUG { 0 }
+
+sub new {
+ my $package = shift;
+ croak "$package requires an even number of parameters" if @_ % 2;
+ my %params = @_;
+ my $alias = $params{'Alias'};
+
+ $alias = 'asterisk_client' unless defined($alias) and length($alias);
+
+ my $listen_port = $params{listen_port} || 5038;
+
+ POE::Session->create(
+ #options => {trace=>1},
+ args => [ %params ],
+ package_states => [
+ 'POE::Component::Client::Asterisk::Manager' => {
+ _start => '_start',
+ _stop => '_stop',
+ signals => 'signals',
+ }
+ ],
+ inline_states => $params{inline_states},
+# {
+# _default => sub {
+# print STDERR "$_[STATE] called\n";
+# },
+# },
+ );
+
+ return 1;
+}
+
+sub _start {
+ my ($kernel, $session, $heap) = @_[KERNEL, SESSION, HEAP];
+ my %params = splice(@_,ARG0);
+
+ if (ref($params{Options}) eq 'HASH') {
+ $session->option( %{$params{Options}} );
+ }
+
+ $params{reconnect_time} = $params{reconnect_time} || 5;
+ $params{requeue_posts} = $params{requeue_posts} || undef;
+
+ $kernel->alias_set($params{Alias});
+
+ # watch for SIGINT
+# $kernel->sig('INT', 'signals');
+
+ $heap->{client} = POE::Component::Client::TCP->new(
+ RemoteAddress => $params{RemoteHost},
+ RemotePort => $params{RemotePort},
+ # no longer a seperate package - see below
+ Filter => "POE::Filter::Asterisk::Manager",
+ Alias => "$params{Alias}_client",
+ Args => [ \%params ],
+ Started => sub {
+ $_[HEAP]->{params} = $_[ARG0];
+ },
+ Disconnected => sub {
+ $_[KERNEL]->delay(reconnect => $_[HEAP]->{params}->{reconnect_time});
+ },
+ Connected => sub {
+ my $heap = $_[HEAP];
+ DEBUG && print STDERR sprintf("connected to %s:%s\n",$heap->{params}->{RemoteHost},$heap->{params}->{RemotePort});
+ $heap->{_connected} = 0;
+ $heap->{_logged_in} = 0;
+ $heap->{_auth_stage} = 0;
+ $_[KERNEL]->delay( recv_timeout => 5 );
+
+ if ($heap->{params}->{Astmanproxy}) {
+ # For astmanproxy? Don't wait for a response
+ $heap->{_connected} = 1;
+ $kernel->call($_[SESSION] => login => splice(@_,ARG0));
+ }
+ },
+ ConnectError => sub {
+ my $heap = $_[HEAP];
+ DEBUG && print STDERR sprintf("could not connect to %s:%s, reconnecting in %s seconds...\n"
+ ,$heap->{params}->{RemoteHost},$heap->{params}->{RemotePort},$heap->{params}->{reconnect_time});
+ $_[KERNEL]->delay(reconnect => $heap->{params}->{reconnect_time});
+ },
+ ServerInput => sub {
+ my ( $kernel, $heap, $input ) = @_[KERNEL, HEAP, ARG0];
+
+ DEBUG && do {
+ require Data::Dumper;
+ print Data::Dumper->Dump([$input],['input']);
+ };
+
+ if ($heap->{_logged_in} == 0 && $heap->{_connected} == 0) {
+ $kernel->delay( recv_timeout => 5 );
+ if ($input->{acm_version}) {
+ $heap->{_version} = $input->{acm_version};
+ $heap->{_connected} = 1;
+ $kernel->call($_[SESSION] => login => splice(@_,ARG0));
+ } else {
+ print STDERR "Invalid Protocol (wrong port?)\n";
+ $kernel->yield("shutdown");
+ }
+ } elsif ($heap->{_connected} == 1 && $heap->{_logged_in} == 0) {
+ $kernel->call($_[SESSION] => login => splice(@_,ARG0));
+ } elsif ($heap->{_logged_in} == 1) {
+ $kernel->call($_[SESSION] => callbacks => splice(@_,ARG0));
+ }
+ },
+
+ InlineStates => {
+ _put => sub {
+ my $heap = $_[HEAP];
+ if ($heap->{server}) {
+ $heap->{server}->put($_[ARG0]);
+ } else {
+ if ($heap->{requeue_posts}) {
+ push(@{$heap->{queued}},$_[ARG0]);
+ } else {
+ print STDERR "cannot send when not connected! -ignored-\n";
+ }
+ }
+ },
+ login_complete => sub {
+ my ( $kernel, $heap ) = @_[KERNEL, HEAP];
+ DEBUG && print STDERR "logged in and ready to process events\n";
+ # call the _connected state
+ $kernel->yield("_connected" => splice(@_,ARG0));
+ },
+ recv_timeout => sub {
+ my ( $kernel, $heap ) = @_[KERNEL, HEAP];
+ unless ($heap->{_connected} == 1) {
+ print STDERR "Timeout waiting for response\n";
+ $heap->{_connected} = 0;
+ $heap->{_logged_in} = 0;
+ $kernel->yield("shutdown");
+ }
+ },
+ login => sub {
+ my ($kernel, $heap, $input) = @_[KERNEL, HEAP, ARG0];
+ if ($heap->{_logged_in} == 1) {
+ # shouldn't get here
+ DEBUG && print STDERR "Login called when already logged in\n";
+ #$kernel->yield(callbacks => splice(@_,ARG0));
+ return;
+ }
+ if ($heap->{_auth_stage} == 0) {
+ $heap->{server}->put({'Action' => 'Challenge', 'AuthType' => 'MD5'});
+ $heap->{_auth_stage} = 1;
+ } elsif ($heap->{_auth_stage} == 1) {
+ unless ($input->{Response} && lc($input->{Response}) eq 'success') {
+ print STDERR "AuthType MD5 may not be supported\n";
+ $kernel->yield("shutdown");
+ return;
+ }
+ if ($input->{Challenge}) {
+ if (! defined $heap->{params}->{Password}) {
+ print STDERR "No password provided\n";
+ $kernel->yield("shutdown");
+ return;
+ }
+
+ my $digest = Digest::MD5::md5_hex("$input->{Challenge}$heap->{params}->{Password}");
+ $heap->{server}->put({'Action' => 'Login', 'AuthType' => 'MD5', 'Username' => $heap->{params}->{Username}, 'Key' => $digest });
+ $heap->{_auth_stage} = 2;
+ }
+ } elsif ($heap->{_auth_stage} == 2) {
+ if ($input->{Message} && lc($input->{Message}) eq 'authentication accepted') {
+ delete $heap->{_auth_stage};
+ $heap->{_logged_in} = 1;
+ # I remembered inline_states not working (above), so i put this in
+ foreach my $k (keys %{$heap->{params}->{inline_states}}) {
+ $kernel->state($k => $heap->{params}->{inline_states}{$k});
+ }
+ if (ref($heap->{queued}) eq 'ARRAY') {
+ foreach my $a (@{$heap->{queued}}) {
+ $heap->{server}->put($a);
+ }
+ delete $heap->{queued};
+ }
+ $kernel->yield(login_complete => splice(@_,ARG0));
+ } elsif ($input->{Message} && lc($input->{Message}) eq 'authentication failed') {
+ print STDERR "Authentication failed.\n";
+ $heap->{_connected} = 0;
+ $heap->{_logged_in} = 0;
+ $kernel->yield("shutdown");
+ }
+ }
+ },
+ callbacks => sub {
+ my ($kernel, $heap, $session, $input) = @_[KERNEL, HEAP, SESSION, ARG0];
+ # TODO this stuff needs some work
+ next unless (ref($input));
+ my $qual = 0;
+ foreach my $k (keys %{$heap->{params}->{CallBacks}}) {
+ my $match = 0;
+ if (ref($heap->{params}->{CallBacks}{$k}) eq 'HASH') {
+ foreach my $c (keys %{$heap->{params}->{CallBacks}{$k}}) {
+ last if ($match == 1);
+ if (exists($input->{$c}) && $heap->{params}->{CallBacks}{$k}{$c} eq $input->{$c}) {
+ $match = 2;
+ $qual++;
+ } else {
+ $match = 1;
+ }
+ }
+ # matched ALL of the callback (not 2 of them like it looks like)
+ if ($match == 2) {
+ # callback good
+ DEBUG && print STDERR "callback $k is good\n";
+ $kernel->yield($k => $input);
+ }
+ } elsif ($heap->{params}->{CallBacks}{$k} eq ':all' || $heap->{params}->{CallBacks}{$k} eq 'default') {
+ $kernel->yield($k => $input);
+ } else {
+ print STDERR "Incorrectly written callback $k\n";
+ }
+ }
+ # use the :all qualifier now
+ #if ($qual == 0) {
+ # $kernel->yield("default" => splice(@_,ARG0));
+ #}
+ },
+ },
+ );
+ DEBUG && print STDERR "Client started.\n";
+}
+
+sub _stop {
+ $_[KERNEL]->yield("shutdown");
+ DEBUG && print STDERR "Client stopped.\n";
+}
+
+# Handle incoming signals (INT)
+
+# TODO disconnect gracefully
+sub signals {
+ my $signal_name = $_[ARG0];
+
+# DEBUG && print STDERR "Client caught SIG$signal_name\n";
+
+ # do not handle the signal
+ return 0;
+}
+
+
+1;
+
+package POE::Filter::Asterisk::Manager;
+
+use strict;
+use Carp qw(croak);
+
+sub DEBUG { 0 };
+
+sub new {
+ my $type = shift;
+ my $self = {
+ buffer => '',
+ crlf => "\x0D\x0A",
+ };
+ bless $self, $type;
+ $self;
+}
+
+sub get {
+ my ($self, $stream) = @_;
+
+ # Accumulate data in a framing buffer.
+ $self->{buffer} .= join('', @$stream);
+
+ my $many = [];
+ while (1) {
+ my $input = $self->get_one([]);
+ if ($input) {
+ push(@$many,@$input);
+ } else {
+ last;
+ }
+ }
+
+ return $many;
+}
+
+sub get_one_start {
+ my ($self, $stream) = @_;
+
+ DEBUG && do {
+ my $temp = join '', @$stream;
+ $temp = unpack 'H*', $temp;
+ warn "got some raw data: $temp\n";
+ };
+
+ # Accumulate data in a framing buffer.
+ $self->{buffer} .= join('', @$stream);
+}
+
+sub get_one {
+ my $self = shift;
+
+ return [] if ($self->{finish});
+
+
+ if ($self->{buffer} =~ s#^(?:Asterisk|Aefirion) Call Manager(?: Proxy)?/(\d+\.\d+\w*)$self->{crlf}##is) {
+ return [{ acm_version => $1 }];
+ }
+
+ return [] unless ($self->{crlf});
+ my $crlf = $self->{crlf};
+
+ # collect lines in buffer until we find a double line
+ return [] unless($self->{buffer} =~ m/${crlf}${crlf}/s);
+
+
+ $self->{buffer} =~ s/(^.*?)(${crlf}${crlf})//s;
+
+ my $buf = "$1${crlf}";
+
+ my $kv = {};
+
+ foreach my $line (split(/(:?${crlf})/,$buf)) {
+ my $tmp = $line;
+ $tmp =~ s/\r|\n//g;
+ next unless($tmp);
+
+ if ($line =~ m/([\w\-]+)\s*:\s+(.*)/) {
+ my $key = $1;
+ my $val = $2;
+ DEBUG && print "recv key $key: $val\n";
+
+ if ($key eq 'Variable',) {
+ for my $v( split /\r/, $val ) {
+ $v =~ s/^Variable\:\s*//;
+ my @parts = split /=/, $v;
+ $kv->{$key}{$parts[0]} = $parts[1];
+ DEBUG && print " recv variable: $parts[0] => $parts[1]\n";
+ }
+ } else {
+ $kv->{$key} = $val;
+ }
+ } else {
+ $kv->{content} .= "$line";
+ }
+ }
+
+ return (keys %$kv) ? [$kv] : [];
+}
+
+sub put {
+ my ($self, $hrefs) = @_;
+ my @raw;
+ for my $i ( 0 .. $#{$hrefs} ) {
+ if (ref($hrefs->[$i]) eq 'HASH') {
+ foreach my $k (keys %{$hrefs->[$i]}) {
+ DEBUG && print "send key $k: $hrefs->[$i]{$k}\n";
+ push(@raw,"$k: $hrefs->[$i]{$k}$self->{crlf}");
+ }
+ } elsif (ref($hrefs->[$i]) eq 'ARRAY') {
+ push(@raw, join("$self->{crlf}", @{$hrefs->[$i]}, ""));
+ } elsif (ref($hrefs->[$i]) eq 'SCALAR') {
+ push(@raw, $hrefs->[$i]);
+ } else {
+ croak "unknown type ".ref($hrefs->[$i])." passed to ".__PACKAGE__."->put()";
+ }
+ push(@raw,"$self->{crlf}");
+ }
+ \@raw;
+}
+
+sub get_pending {
+ my $self = shift;
+ return [ $self->{buffer} ] if length $self->{buffer};
+ return undef;
+}
+
+1;
+
+__END__
+
+=head1 NAME
+
+POE::Component::Client::Asterisk::Manager - Event-based Asterisk / Aefirion Manager Client
+
+=head1 SYNOPSIS
+
+ use POE qw( Component::Client::Asterisk::Manager );
+
+ POE::Component::Client::Asterisk::Manager->new(
+ Alias => 'monitor',
+ RemoteHost => 'localhost',
+ RemotePort => 5038, # default port
+ CallBacks => {
+ input => ':all', # catchall for all manager events
+ ring => {
+ 'Event' => 'Newchannel',
+ 'State' => 'Ring',
+ },
+ },
+ inline_states => {
+ input => sub {
+ my $input = $_[ARG0];
+ # good for figuring out what manager events look like
+ require Data::Dumper;
+ print Data::Dumper->Dump([$input]);
+ },
+ ring => sub {
+ my $input = $_[ARG0];
+ # $input is a hash ref with info from
+ print STDERR "RING on channel $input->{Channel}\n";
+ },
+ },
+ );
+
+ $poe_kernel->run();
+
+=head1 DESCRIPTION
+
+POE::Component::Client::Asterisk::Manager is an event driven Asterisk manager
+client. This module should also work with Aefirion.
+
+=head1 METHODS
+
+=head2 new()
+
+This method creates a POE::Component::Client::TCP session and works inside
+that session. You can specify the alias, host, port and inline_states.
+See the synopsis for an example.
+
+=head1 CALLBACKS
+
+Callbacks are events that meet a criteria specified in a hash ref
+
+For example:
+
+ ring => {
+ 'Event' => 'Newchannel',
+ 'State' => 'Ring',
+ },
+
+The event 'ring' will be called with a hash href in ARG0 when the component
+receives a manager event matching 'Newchannel' and manager state 'Ring'.
+
+You can specify a catch all event like this:
+
+ catch_all => ':all'
+
+Note: This was changed from 'default' to ':all' in an effort to make it
+more clear. 'default' will also work.
+
+=head1 BUGS
+
+Please report them via RT:
+L<http://rt.cpan.org/NoAuth/ReportBug.html?Queue=POE%3A%3AComponent%3A%3AClient%3A%3AAsterisk%3A%3AManager>
+
+=head1 EXAMPLES
+
+There are a few examples in the examples directory that can get you going.
+
+=head1 AUTHOR
+
+David Davis, E<lt>xantus@cpan.orgE<gt>
+
+=head1 SEE ALSO
+
+perl(1), POE::Filter::Asterisk::Manager, L<http://aefirion.org/>, L<http://asterisk.org/>,
+L<http://teknikill.net/>
+
+=cut