Subversion Repositories general

Compare Revisions

No changes between revisions

Ignore whitespace Rev 1369 → Rev 1370

/asterisk-activecalls/POE/Component/Client/Asterisk/.exists
--- POE/Component/Client/Asterisk/Manager.pm (nonexistent)
+++ POE/Component/Client/Asterisk/Manager.pm (revision 1370)
@@ -0,0 +1,485 @@
+package POE::Component::Client::Asterisk::Manager;
+
+######################################################################
+### POE::Component::Client::Asterisk::Manager
+### David Davis (xantus@cpan.org)
+###
+### Copyright (c) 2003-2005 David Davis and Teknikill. All Rights
+### Reserved. This module is free software; you can redistribute it
+### and/or modify it under the same terms as Perl itself.
+######################################################################
+
+use strict;
+use warnings;
+
+our $VERSION = '0.08';
+
+use Carp qw(croak);
+use POE qw( Component::Client::TCP );
+use Digest::MD5;
+
+sub DEBUG { 0 }
+
+sub new {
+ my $package = shift;
+ croak "$package requires an even number of parameters" if @_ % 2;
+ my %params = @_;
+ my $alias = $params{'Alias'};
+
+ $alias = 'asterisk_client' unless defined($alias) and length($alias);
+
+ my $listen_port = $params{listen_port} || 5038;
+
+ POE::Session->create(
+ #options => {trace=>1},
+ args => [ %params ],
+ package_states => [
+ 'POE::Component::Client::Asterisk::Manager' => {
+ _start => '_start',
+ _stop => '_stop',
+ signals => 'signals',
+ }
+ ],
+ inline_states => $params{inline_states},
+# {
+# _default => sub {
+# print STDERR "$_[STATE] called\n";
+# },
+# },
+ );
+
+ return 1;
+}
+
+sub _start {
+ my ($kernel, $session, $heap) = @_[KERNEL, SESSION, HEAP];
+ my %params = splice(@_,ARG0);
+
+ if (ref($params{Options}) eq 'HASH') {
+ $session->option( %{$params{Options}} );
+ }
+
+ $params{reconnect_time} = $params{reconnect_time} || 5;
+ $params{requeue_posts} = $params{requeue_posts} || undef;
+
+ $kernel->alias_set($params{Alias});
+
+ # watch for SIGINT
+# $kernel->sig('INT', 'signals');
+
+ $heap->{client} = POE::Component::Client::TCP->new(
+ RemoteAddress => $params{RemoteHost},
+ RemotePort => $params{RemotePort},
+ # no longer a seperate package - see below
+ Filter => "POE::Filter::Asterisk::Manager",
+ Alias => "$params{Alias}_client",
+ Args => [ \%params ],
+ Started => sub {
+ $_[HEAP]->{params} = $_[ARG0];
+ },
+ Disconnected => sub {
+ $_[KERNEL]->delay(reconnect => $_[HEAP]->{params}->{reconnect_time});
+ },
+ Connected => sub {
+ my $heap = $_[HEAP];
+ DEBUG && print STDERR sprintf("connected to %s:%s\n",$heap->{params}->{RemoteHost},$heap->{params}->{RemotePort});
+ $heap->{_connected} = 0;
+ $heap->{_logged_in} = 0;
+ $heap->{_auth_stage} = 0;
+ $_[KERNEL]->delay( recv_timeout => 5 );
+
+ if ($heap->{params}->{Astmanproxy}) {
+ # For astmanproxy? Don't wait for a response
+ $heap->{_connected} = 1;
+ $kernel->call($_[SESSION] => login => splice(@_,ARG0));
+ }
+ },
+ ConnectError => sub {
+ my $heap = $_[HEAP];
+ DEBUG && print STDERR sprintf("could not connect to %s:%s, reconnecting in %s seconds...\n"
+ ,$heap->{params}->{RemoteHost},$heap->{params}->{RemotePort},$heap->{params}->{reconnect_time});
+ $_[KERNEL]->delay(reconnect => $heap->{params}->{reconnect_time});
+ },
+ ServerInput => sub {
+ my ( $kernel, $heap, $input ) = @_[KERNEL, HEAP, ARG0];
+
+ DEBUG && do {
+ require Data::Dumper;
+ print Data::Dumper->Dump([$input],['input']);
+ };
+
+ if ($heap->{_logged_in} == 0 && $heap->{_connected} == 0) {
+ $kernel->delay( recv_timeout => 5 );
+ if ($input->{acm_version}) {
+ $heap->{_version} = $input->{acm_version};
+ $heap->{_connected} = 1;
+ $kernel->call($_[SESSION] => login => splice(@_,ARG0));
+ } else {
+ print STDERR "Invalid Protocol (wrong port?)\n";
+ $kernel->yield("shutdown");
+ }
+ } elsif ($heap->{_connected} == 1 && $heap->{_logged_in} == 0) {
+ $kernel->call($_[SESSION] => login => splice(@_,ARG0));
+ } elsif ($heap->{_logged_in} == 1) {
+ $kernel->call($_[SESSION] => callbacks => splice(@_,ARG0));
+ }
+ },
+
+ InlineStates => {
+ _put => sub {
+ my $heap = $_[HEAP];
+ if ($heap->{server}) {
+ $heap->{server}->put($_[ARG0]);
+ } else {
+ if ($heap->{requeue_posts}) {
+ push(@{$heap->{queued}},$_[ARG0]);
+ } else {
+ print STDERR "cannot send when not connected! -ignored-\n";
+ }
+ }
+ },
+ login_complete => sub {
+ my ( $kernel, $heap ) = @_[KERNEL, HEAP];
+ DEBUG && print STDERR "logged in and ready to process events\n";
+ # call the _connected state
+ $kernel->yield("_connected" => splice(@_,ARG0));
+ },
+ recv_timeout => sub {
+ my ( $kernel, $heap ) = @_[KERNEL, HEAP];
+ unless ($heap->{_connected} == 1) {
+ print STDERR "Timeout waiting for response\n";
+ $heap->{_connected} = 0;
+ $heap->{_logged_in} = 0;
+ $kernel->yield("shutdown");
+ }
+ },
+ login => sub {
+ my ($kernel, $heap, $input) = @_[KERNEL, HEAP, ARG0];
+ if ($heap->{_logged_in} == 1) {
+ # shouldn't get here
+ DEBUG && print STDERR "Login called when already logged in\n";
+ #$kernel->yield(callbacks => splice(@_,ARG0));
+ return;
+ }
+ if ($heap->{_auth_stage} == 0) {
+ $heap->{server}->put({'Action' => 'Challenge', 'AuthType' => 'MD5'});
+ $heap->{_auth_stage} = 1;
+ } elsif ($heap->{_auth_stage} == 1) {
+ unless ($input->{Response} && lc($input->{Response}) eq 'success') {
+ print STDERR "AuthType MD5 may not be supported\n";
+ $kernel->yield("shutdown");
+ return;
+ }
+ if ($input->{Challenge}) {
+ if (! defined $heap->{params}->{Password}) {
+ print STDERR "No password provided\n";
+ $kernel->yield("shutdown");
+ return;
+ }
+
+ my $digest = Digest::MD5::md5_hex("$input->{Challenge}$heap->{params}->{Password}");
+ $heap->{server}->put({'Action' => 'Login', 'AuthType' => 'MD5', 'Username' => $heap->{params}->{Username}, 'Key' => $digest });
+ $heap->{_auth_stage} = 2;
+ }
+ } elsif ($heap->{_auth_stage} == 2) {
+ if ($input->{Message} && lc($input->{Message}) eq 'authentication accepted') {
+ delete $heap->{_auth_stage};
+ $heap->{_logged_in} = 1;
+ # I remembered inline_states not working (above), so i put this in
+ foreach my $k (keys %{$heap->{params}->{inline_states}}) {
+ $kernel->state($k => $heap->{params}->{inline_states}{$k});
+ }
+ if (ref($heap->{queued}) eq 'ARRAY') {
+ foreach my $a (@{$heap->{queued}}) {
+ $heap->{server}->put($a);
+ }
+ delete $heap->{queued};
+ }
+ $kernel->yield(login_complete => splice(@_,ARG0));
+ } elsif ($input->{Message} && lc($input->{Message}) eq 'authentication failed') {
+ print STDERR "Authentication failed.\n";
+ $heap->{_connected} = 0;
+ $heap->{_logged_in} = 0;
+ $kernel->yield("shutdown");
+ }
+ }
+ },
+ callbacks => sub {
+ my ($kernel, $heap, $session, $input) = @_[KERNEL, HEAP, SESSION, ARG0];
+ # TODO this stuff needs some work
+ next unless (ref($input));
+ my $qual = 0;
+ foreach my $k (keys %{$heap->{params}->{CallBacks}}) {
+ my $match = 0;
+ if (ref($heap->{params}->{CallBacks}{$k}) eq 'HASH') {
+ foreach my $c (keys %{$heap->{params}->{CallBacks}{$k}}) {
+ last if ($match == 1);
+ if (exists($input->{$c}) && $heap->{params}->{CallBacks}{$k}{$c} eq $input->{$c}) {
+ $match = 2;
+ $qual++;
+ } else {
+ $match = 1;
+ }
+ }
+ # matched ALL of the callback (not 2 of them like it looks like)
+ if ($match == 2) {
+ # callback good
+ DEBUG && print STDERR "callback $k is good\n";
+ $kernel->yield($k => $input);
+ }
+ } elsif ($heap->{params}->{CallBacks}{$k} eq ':all' || $heap->{params}->{CallBacks}{$k} eq 'default') {
+ $kernel->yield($k => $input);
+ } else {
+ print STDERR "Incorrectly written callback $k\n";
+ }
+ }
+ # use the :all qualifier now
+ #if ($qual == 0) {
+ # $kernel->yield("default" => splice(@_,ARG0));
+ #}
+ },
+ },
+ );
+ DEBUG && print STDERR "Client started.\n";
+}
+
+sub _stop {
+ $_[KERNEL]->yield("shutdown");
+ DEBUG && print STDERR "Client stopped.\n";
+}
+
+# Handle incoming signals (INT)
+
+# TODO disconnect gracefully
+sub signals {
+ my $signal_name = $_[ARG0];
+
+# DEBUG && print STDERR "Client caught SIG$signal_name\n";
+
+ # do not handle the signal
+ return 0;
+}
+
+
+1;
+
+package POE::Filter::Asterisk::Manager;
+
+use strict;
+use Carp qw(croak);
+
+sub DEBUG { 0 };
+
+sub new {
+ my $type = shift;
+ my $self = {
+ buffer => '',
+ crlf => "\x0D\x0A",
+ };
+ bless $self, $type;
+ $self;
+}
+
+sub get {
+ my ($self, $stream) = @_;
+
+ # Accumulate data in a framing buffer.
+ $self->{buffer} .= join('', @$stream);
+
+ my $many = [];
+ while (1) {
+ my $input = $self->get_one([]);
+ if ($input) {
+ push(@$many,@$input);
+ } else {
+ last;
+ }
+ }
+
+ return $many;
+}
+
+sub get_one_start {
+ my ($self, $stream) = @_;
+
+ DEBUG && do {
+ my $temp = join '', @$stream;
+ $temp = unpack 'H*', $temp;
+ warn "got some raw data: $temp\n";
+ };
+
+ # Accumulate data in a framing buffer.
+ $self->{buffer} .= join('', @$stream);
+}
+
+sub get_one {
+ my $self = shift;
+
+ return [] if ($self->{finish});
+
+
+ if ($self->{buffer} =~ s#^(?:Asterisk|Aefirion) Call Manager(?: Proxy)?/(\d+\.\d+\w*)$self->{crlf}##is) {
+ return [{ acm_version => $1 }];
+ }
+
+ return [] unless ($self->{crlf});
+ my $crlf = $self->{crlf};
+
+ # collect lines in buffer until we find a double line
+ return [] unless($self->{buffer} =~ m/${crlf}${crlf}/s);
+
+
+ $self->{buffer} =~ s/(^.*?)(${crlf}${crlf})//s;
+
+ my $buf = "$1${crlf}";
+
+ my $kv = {};
+
+ foreach my $line (split(/(:?${crlf})/,$buf)) {
+ my $tmp = $line;
+ $tmp =~ s/\r|\n//g;
+ next unless($tmp);
+
+ if ($line =~ m/([\w\-]+)\s*:\s+(.*)/) {
+ my $key = $1;
+ my $val = $2;
+ DEBUG && print "recv key $key: $val\n";
+
+ if ($key eq 'Variable',) {
+ for my $v( split /\r/, $val ) {
+ $v =~ s/^Variable\:\s*//;
+ my @parts = split /=/, $v;
+ $kv->{$key}{$parts[0]} = $parts[1];
+ DEBUG && print " recv variable: $parts[0] => $parts[1]\n";
+ }
+ } else {
+ $kv->{$key} = $val;
+ }
+ } else {
+ $kv->{content} .= "$line";
+ }
+ }
+
+ return (keys %$kv) ? [$kv] : [];
+}
+
+sub put {
+ my ($self, $hrefs) = @_;
+ my @raw;
+ for my $i ( 0 .. $#{$hrefs} ) {
+ if (ref($hrefs->[$i]) eq 'HASH') {
+ foreach my $k (keys %{$hrefs->[$i]}) {
+ DEBUG && print "send key $k: $hrefs->[$i]{$k}\n";
+ push(@raw,"$k: $hrefs->[$i]{$k}$self->{crlf}");
+ }
+ } elsif (ref($hrefs->[$i]) eq 'ARRAY') {
+ push(@raw, join("$self->{crlf}", @{$hrefs->[$i]}, ""));
+ } elsif (ref($hrefs->[$i]) eq 'SCALAR') {
+ push(@raw, $hrefs->[$i]);
+ } else {
+ croak "unknown type ".ref($hrefs->[$i])." passed to ".__PACKAGE__."->put()";
+ }
+ push(@raw,"$self->{crlf}");
+ }
+ \@raw;
+}
+
+sub get_pending {
+ my $self = shift;
+ return [ $self->{buffer} ] if length $self->{buffer};
+ return undef;
+}
+
+1;
+
+__END__
+
+=head1 NAME
+
+POE::Component::Client::Asterisk::Manager - Event-based Asterisk / Aefirion Manager Client
+
+=head1 SYNOPSIS
+
+ use POE qw( Component::Client::Asterisk::Manager );
+
+ POE::Component::Client::Asterisk::Manager->new(
+ Alias => 'monitor',
+ RemoteHost => 'localhost',
+ RemotePort => 5038, # default port
+ CallBacks => {
+ input => ':all', # catchall for all manager events
+ ring => {
+ 'Event' => 'Newchannel',
+ 'State' => 'Ring',
+ },
+ },
+ inline_states => {
+ input => sub {
+ my $input = $_[ARG0];
+ # good for figuring out what manager events look like
+ require Data::Dumper;
+ print Data::Dumper->Dump([$input]);
+ },
+ ring => sub {
+ my $input = $_[ARG0];
+ # $input is a hash ref with info from
+ print STDERR "RING on channel $input->{Channel}\n";
+ },
+ },
+ );
+
+ $poe_kernel->run();
+
+=head1 DESCRIPTION
+
+POE::Component::Client::Asterisk::Manager is an event driven Asterisk manager
+client. This module should also work with Aefirion.
+
+=head1 METHODS
+
+=head2 new()
+
+This method creates a POE::Component::Client::TCP session and works inside
+that session. You can specify the alias, host, port and inline_states.
+See the synopsis for an example.
+
+=head1 CALLBACKS
+
+Callbacks are events that meet a criteria specified in a hash ref
+
+For example:
+
+ ring => {
+ 'Event' => 'Newchannel',
+ 'State' => 'Ring',
+ },
+
+The event 'ring' will be called with a hash href in ARG0 when the component
+receives a manager event matching 'Newchannel' and manager state 'Ring'.
+
+You can specify a catch all event like this:
+
+ catch_all => ':all'
+
+Note: This was changed from 'default' to ':all' in an effort to make it
+more clear. 'default' will also work.
+
+=head1 BUGS
+
+Please report them via RT:
+L<http://rt.cpan.org/NoAuth/ReportBug.html?Queue=POE%3A%3AComponent%3A%3AClient%3A%3AAsterisk%3A%3AManager>
+
+=head1 EXAMPLES
+
+There are a few examples in the examples directory that can get you going.
+
+=head1 AUTHOR
+
+David Davis, E<lt>xantus@cpan.orgE<gt>
+
+=head1 SEE ALSO
+
+perl(1), POE::Filter::Asterisk::Manager, L<http://aefirion.org/>, L<http://asterisk.org/>,
+L<http://teknikill.net/>
+
+=cut
/asterisk-activecalls/activecalls.pl
0,0 → 1,650
#!/usr/local/bin/perl -w
 
# TODO:
# - 'dst' is name, not number if called SIP internally (?)
 
# fields not set: amaflags, accountcode, userfield
 
# One could use directly 'Action: Status' every time when he needs list of active calls
# without intermidately database, but there is no information about linked time
# and if a channel is still 'Ring' there is no realible way to find the pair channel.
# Alternate he can use 'Action: Command\r\nCommand: core show channel ...' where full CDR info
# is attached, but it is very slow (several seconds in my test with two channels).
 
# == globals ======================================================================================
use strict;
use vars;
use subs;
use DBI;
use POE qw( Component::Client::Asterisk::Manager );
 
our %config;
my $db;
my %channels; # active channels
my %channels_from_status; # channels already running on startup
 
# =================================================================================================
sub logmsg {
my $input = shift @_;
my $msg = shift @_;
 
my $timestamp = ($input && $input->{Timestamp} ? $input->{Timestamp} : time() . " ");
 
print STDERR "$timestamp : $msg\n";
}
 
# =================================================================================================
sub db_connect {
for(my $i = 0; $i < $config{db_connect_retries}; ++$i) {
if($db) {
if($db->ping()) {
last;
}
else {
undef $db;
}
}
 
unless($db) {
logmsg(undef, "Connnect to database");
$db = DBI->connect('DBI:Pg:dbname=' . $config{db_name}
. ';host=' . $config{db_host} . ';port=' . $config{db_port},
$config{db_user}, $config{db_password});
}
 
if($db) {
last;
}
else {
sleep($config{db_connect_pause});
}
}
 
unless($db) {
die "Cannot connect to database";
}
}
 
# =================================================================================================
sub caller_id_set {
my $id = shift @_;
 
return ($id && $id ne '<unknown>' && $id ne '<Unknown>');
}
 
# =================================================================================================
sub get_timestamp {
my $input = shift @_;
 
return ($input && $input->{Timestamp} ? int($input->{Timestamp}) : time()),
}
 
# =================================================================================================
sub find_caller_id {
my $channel = shift @_;
 
my($callerId, $callerIdNum, $callerIdName);
 
$callerIdNum = $channel->{CallerIDNum} if(caller_id_set($channel->{CallerIDNum}));
$callerIdName = $channel->{CallerIDName} if(caller_id_set($channel->{CallerIDName}));
 
if($callerIdNum && $callerIdName) {
$callerId = "\"$callerIdName\" <$callerIdNum>";
}
elsif($callerIdNum) {
$callerId = $callerIdNum;
}
elsif($callerIdName) {
$callerId = "\"$callerIdName\"";
}
 
return { full => $callerId, num => $callerIdNum, name => $callerIdName };
}
 
# =================================================================================================
sub create_record {
my $channel = shift @_;
 
my(@keys, @values);
 
if($channel->{Uniqueid}) {
push(@keys, 'uniqueid');
push(@values, $channel->{Uniqueid});
}
if($channel->{Channel}) {
push(@keys, 'channel');
push(@values, $channel->{Channel});
}
if($channel->{Disposition}) {
push(@keys, 'disposition');
push(@values, $channel->{Disposition});
}
 
my $callerId = find_caller_id($channel);
if($callerId->{full}) {
push(@keys, 'clid');
push(@values, $callerId->{full});
}
if($callerId->{num}) {
push(@keys, 'src');
push(@values, $callerId->{num});
}
 
db_connect();
my $acctid_res = $db->selectrow_hashref("select nextval('" . $config{db_seq} . "')");
die "Cannot get next acctid" unless($acctid_res);
$channel->{Id} = $acctid_res->{nextval};
 
logmsg(undef, "Create a new record " . $channel->{Id} . " for channel " . $channel->{Uniqueid});
 
my $st = $db->prepare('insert into ' . $config{db_table} . ' (acctid,calldate,'
. join(',', @keys)
. ") values (?,TIMESTAMP WITH TIME ZONE 'epoch'+?*INTERVAL '1 second',"
. join(',', map('?', @values)) . ')');
die "Cannot create prepared statement" unless($st);
 
unshift(@values, $channel->{BeginTimestamp});
unshift(@values, $channel->{Id});
$st->execute(@values) || die "Cannot execute prepared statement";
$channel->{Modified} = 0;
}
 
# =================================================================================================
sub update_record {
my $channel = shift @_;
 
return unless($channel->{Id} && $channel->{Modified});
 
logmsg(undef, "Update record " . $channel->{Id} . " for channel " . $channel->{Uniqueid});
my(@keys, @values);
 
if($channel->{LinkTimestamp}) {
push(@keys, "linkdate=TIMESTAMP WITH TIME ZONE 'epoch'+?*INTERVAL '1 second'");
push(@values, $channel->{LinkTimestamp});
}
if($channel->{Disposition}) {
push(@keys, 'disposition=?');
push(@values, $channel->{Disposition});
}
if($channel->{Context}) {
push(@keys, 'dcontext=?');
push(@values, $channel->{Context});
}
if($channel->{Application}) {
push(@keys, 'lastapp=?');
push(@values, $channel->{Application});
}
if(defined($channel->{AppData})) {
push(@keys, 'lastdata=?');
push(@values, $channel->{AppData});
}
if($channel->{Dest}) {
push(@keys, 'dst=?');
push(@values, $channel->{Dest});
}
 
my $callerId = find_caller_id($channel);
if($callerId->{full}) {
push(@keys, 'clid=?');
push(@values, $callerId->{full});
}
if($callerId->{num}) {
push(@keys, 'src=?');
push(@values, $callerId->{num});
}
 
my $dest_channel = $channel->{DestChannel};
if($dest_channel) {
if($dest_channel->{Channel}) {
push(@keys, 'dstchannel=?');
push(@values, $dest_channel->{Channel});
}
}
 
if($#keys >= 0) {
db_connect();
my $st = $db->prepare('update ' . $config{db_table} . ' set ' . join(',', @keys)
. ' where acctid=?');
die "Cannot create prepared statement" unless($st);
 
push(@values, $channel->{Id});
$st->execute(@values) || die "Cannot execute prepared statement";
}
$channel->{Modified} = 0;
}
 
# =================================================================================================
sub delete_record {
my $channel = shift @_;
 
return unless($channel->{Id});
 
logmsg(undef, "Delete record " . $channel->{Id} . " for channel " . $channel->{Uniqueid});
my(@values);
 
db_connect();
my $st = $db->prepare('delete from ' . $config{db_table} . ' where acctid=?');
die "Cannot create prepared statement" unless($st);
 
push(@values, $channel->{Id});
$st->execute(@values) || die "Cannot execute prepared statement";
$channel->{Modified} = 0;
}
 
# =================================================================================================
sub close_record {
my $channel = shift @_;
 
return unless($channel->{Id});
 
logmsg(undef, "Close record " . $channel->{Id} . " for channel " . $channel->{Uniqueid});
my(@keys, @values);
 
if($channel->{Disposition}) {
push(@keys, 'disposition=?');
push(@values, $channel->{Disposition});
}
if($channel->{Application}) {
push(@keys, 'lastapp=?');
push(@values, $channel->{Application});
}
if(defined($channel->{AppData})) {
push(@keys, 'lastdata=?');
push(@values, $channel->{AppData});
}
 
push(@keys, 'duration=?');
push(@values, $channel->{EndTimestamp} - $channel->{BeginTimestamp});
 
push(@keys, 'billsec=?');
push(@values, ($channel->{LinkTimestamp}
? $channel->{EndTimestamp} - $channel->{LinkTimestamp} : 0));
 
db_connect();
my $st = $db->prepare('update ' . $config{db_table} . ' set ' . join(',', @keys)
. ' where acctid=?');
die "Cannot create prepared statement" unless($st);
 
push(@values, $channel->{Id});
$st->execute(@values) || die "Cannot execute prepared statement";
$channel->{Modified} = 0;
}
 
# =================================================================================================
sub find_record_id {
my $unique_id = shift @_;
 
my(@values);
 
db_connect();
my $st = $db->prepare('select acctid from ' . $config{db_table} . ' where uniqueid=?');
die "Cannot create prepared statement" unless($st);
 
push(@values, $unique_id);
$st->execute(@values) || die "Cannot execute prepared statement";
 
if(my @row = $st->fetchrow_array) {
logmsg(undef, "Search record id for UniqueId " . $unique_id . ": " . $row[0]);
return $row[0];
}
 
logmsg(undef, "Search record id for UniqueId " . $unique_id . ": not found");
return undef;
}
 
# =================================================================================================
sub add_channel {
my $input = shift @_;
 
my $channel = {
Uniqueid => $input->{Uniqueid},
Channel => $input->{Channel},
BeginTimestamp => get_timestamp($input),
Disposition => 'NO ANSWER',
Modified => 1,
};
$channels{$input->{Uniqueid}} = $channel;
update_channel_callerid($input);
 
logmsg($input, "Add channel " . $channel->{Uniqueid});
 
return $channel;
}
 
# =================================================================================================
sub update_channel_callerid {
my $input = shift @_;
 
my $channel = $channels{$input->{Uniqueid}};
if($channel) {
if(caller_id_set($input->{CallerIDNum})) {
$channel->{CallerIDNum} = $input->{CallerIDNum};
}
elsif(caller_id_set($input->{CallerID}) && !caller_id_set($channel->{CallerIDNum})) {
$channel->{CallerIDNum} = $input->{CallerID};
}
$channel->{CallerIDName} = $input->{CallerIDName} if($input->{CallerIDName});
$channel->{Modified} = 1;
logmsg($input, "Update channel CallerID " . $channel->{Uniqueid} . " -> "
. $channel->{CallerIDNum} . "/" . $channel->{CallerIDName});
}
 
return $channel;
}
 
# =================================================================================================
sub update_channel_dest {
my $input = shift @_;
 
logmsg($input, "Update channel dest " . $input->{SrcUniqueID} . " -> " . $input->{DestUniqueID});
my $src_channel = $channels{$input->{SrcUniqueID}};
my $dest_channel = $channels{$input->{DestUniqueID}};
if($src_channel) {
$src_channel->{DestChannel} = $dest_channel;
$src_channel->{Modified} = 1;
}
if($dest_channel) {
$dest_channel->{SrcChannel} = $src_channel;
$dest_channel->{Modified} = 1;
}
 
return $src_channel;
}
 
# =================================================================================================
sub update_channel_state {
my $input = shift @_;
 
my $channel = $channels{$input->{Uniqueid}};
if($channel) {
if($input->{State} eq 'Busy') {
$channel->{Disposition} = 'BUSY';
$channel->{Modified} = 1;
logmsg($input, "Update channel disposition " . $channel->{Uniqueid} . " -> "
. $channel->{Disposition});
}
}
 
return $channel;
}
 
# =================================================================================================
sub update_channel_link {
my $input = shift @_;
 
my $channel = $channels{$input->{Uniqueid1}};
if($channel) {
$channel->{LinkTimestamp} = get_timestamp($input);
$channel->{Disposition} = 'ANSWERED';
$channel->{Modified} = 1;
logmsg($input, "Update channel disposition " . $channel->{Uniqueid} . " -> "
. $channel->{Disposition});
}
 
return $channel;
}
 
# =================================================================================================
sub update_channel_exten {
my $input = shift @_;
 
my $channel = $channels{$input->{Uniqueid}};
if($channel) {
$channel->{Modified} = ($channel->{Application} ne $input->{Application}
|| $channel->{AppData} ne $input->{AppData});
 
$channel->{Application} = $input->{Application};
$channel->{AppData} = $input->{AppData};
 
if(!$channel->{Context}) {
$channel->{Context} = $input->{Context};
$channel->{Modified} = 1;
logmsg($input, "Update channel context " . $channel->{Uniqueid} . " -> "
. $channel->{Context});
}
if(!$channel->{Dest}) {
$channel->{Dest} = $input->{Extension};
$channel->{Modified} = 1;
}
}
 
return $channel;
}
 
# =================================================================================================
sub hangup_channel {
my $input = shift @_;
 
my $channel = $channels{$input->{Uniqueid}};
if($channel) {
$channel->{EndTimestamp} = get_timestamp($input);
$channel->{Modified} = 1;
 
my $cause = $input->{Cause};
if($cause == 0 || $cause == 16) {
# normal clearing
}
elsif($cause == 17) {
$channel->{Disposition} = 'BUSY';
}
elsif($cause == 19) {
$channel->{Disposition} = 'NO ANSWER';
}
else { # some unknown cause, should be a fail
$channel->{Disposition} = 'FAILED';
}
 
logmsg($input, "Remove channel " . $input->{Uniqueid} . ", cause " . $input->{Cause}
. " (" . $input->{'Cause-txt'} . ")");
delete $channels{$input->{Uniqueid}};
}
 
return $channel;
}
 
# =================================================================================================
sub find_running_channels {
for my $input (values %channels_from_status) {
next if($input->{Already_Used});
 
my($src, $dest, $link);
 
if($input->{Link}) {
$link = $channels_from_status{$input->{Link}};
}
elsif($input->{State} eq 'Ring') {
# channel is not linked yet, try to find the pair by unique id
my($id1, $id2) = ($input->{Uniqueid} =~ /^(\d+)\.(\d+)$/);
my $id = $id1 . "." . ($id2+1); # the next id
for my $l (values %channels_from_status) {
if($l->{Uniqueid} eq $id && !$l->{Extension} && !$l->{Already_Used}) {
$link = $l;
last;
}
}
}
else {
next;
}
 
if($input->{Extension}) {
$src = $input;
$dest = $link;
}
elsif($link && $link->{Extension}) {
$src = $link;
$dest = $input;
}
else {
$src = $input;
}
 
my $src_channel = {
Uniqueid => $src->{Uniqueid},
Channel => $src->{Channel},
BeginTimestamp => get_timestamp($input) - $src->{Seconds},
Disposition => ($src->{State} eq 'Up' ? 'ANSWERED' : 'NO ANSWER'),
Modified => 1,
};
$src_channel->{LinkTimestamp} = $src_channel->{BeginTimestamp}
if($src_channel->{Disposition} eq 'ANSWERED'); # the best guess
$channels{$src_channel->{Uniqueid}} = $src_channel;
$src->{Already_Used} = 1;
logmsg(undef, "Add running channel " . $src_channel->{Uniqueid});
 
if($dest) {
my $dest_channel = {
Uniqueid => $dest->{Uniqueid},
Channel => $dest->{Channel},
SrcChannel => $src_channel,
};
$channels{$dest_channel->{Uniqueid}} = $dest_channel;
$dest->{Already_Used} = 1;
logmsg(undef, "Add running channel " . $dest_channel->{Uniqueid});
 
$src_channel->{DestChannel} = $dest_channel;
$src_channel->{Dest} = $dest->{CallerIDNum};
}
 
update_channel_state($src);
update_channel_callerid($src);
 
$src_channel->{Id} = find_record_id($src_channel->{Uniqueid});
 
create_record($src_channel) unless($src_channel->{Id});
$src_channel->{Modified} = 1;
update_record($src_channel);
}
}
 
# =================================================================================================
sub main {
# load config
my $config_filename = $ARGV[0] || 'activecalls.conf';
require $config_filename;
# setup
my $client = POE::Component::Client::Asterisk::Manager->new(
Alias => 'activecalls',
RemoteHost => $config{ast_host},
RemotePort => $config{ast_port},
Username => $config{ast_user},
Password => $config{ast_password},
CallBacks => {
dump => ':all',
status => {
'Event' => 'Status',
},
status_complete => {
'Event' => 'StatusComplete',
},
newchannel => {
'Event' => 'Newchannel',
},
newcallerid => {
'Event' => 'Newcallerid',
},
hangup => {
'Event' => 'Hangup',
},
newstate => {
'Event' => 'Newstate',
},
dial => {
'Event' => 'Dial',
},
link => {
'Event' => 'Link',
},
newexten => {
'Event' => 'Newexten',
},
},
inline_states => {
_connected => sub {
logmsg(undef, "Logged in");
 
my ($kernel, $session, $heap) = @_[KERNEL, SESSION, HEAP];
$heap->{server}->put({'Action' => 'Status'}); # get list of running channels
},
dump => sub {
my $input = $_[ARG0];
require Data::Dumper;
print Data::Dumper->Dump([$input]);
},
status => sub {
my $input = $_[ARG0];
$input->{Timestamp} = (time() . ".000000") if(!$input->{Timestamp});
$channels_from_status{$input->{Channel}} = $input;
},
status_complete => sub {
find_running_channels();
},
newchannel => sub {
my $input = $_[ARG0];
if(my $channel = add_channel($input)) {
create_record($channel) if($input->{State} eq 'Ring');
}
},
newcallerid => sub {
my $input = $_[ARG0];
if(my $channel = update_channel_callerid($input)) {
update_record($channel);
}
},
hangup => sub {
my $input = $_[ARG0];
if(my $channel = hangup_channel($input)) {
if($config{delete_on_hangup}) {
delete_record($channel);
}
else {
close_record($channel);
}
}
},
newstate => sub {
my $input = $_[ARG0];
if(my $channel = update_channel_state($input)) {
if($channel->{Id}) {
update_record($channel);
}
elsif($input->{State} eq 'Ring') {
create_record($channel);
}
}
},
dial => sub {
my $input = $_[ARG0];
if(my $channel = update_channel_dest($input)) {
update_record($channel);
}
},
link => sub {
my $input = $_[ARG0];
if(my $channel = update_channel_link($input)) {
update_record($channel);
}
},
newexten => sub {
my $input = $_[ARG0];
if(my $channel = update_channel_exten($input)) {
update_record($channel);
}
},
},
);
 
# listen
logmsg(undef, "Listen...");
$poe_kernel->run();
}
 
# =================================================================================================
main();
 
Property changes:
Added: svn:executable
/asterisk-activecalls/table.pgsql
0,0 → 1,22
CREATE SEQUENCE activecalls_acctid_seq;
 
CREATE TABLE activecalls (
acctid bigint not null,
calldate timestamp with time zone not null,
linkdate timestamp with time zone,
clid character varying(80),
src character varying(80),
dst character varying(80),
dcontext character varying(80),
channel character varying(80),
dstchannel character varying(80),
lastapp character varying(80),
lastdata character varying(80),
duration integer,
billsec integer,
disposition character varying(45),
amaflags integer,
accountcode character varying(20),
uniqueid character varying(32),
userfield character varying(255)
);
/asterisk-activecalls/activecalls.conf-sample
0,0 → 1,21
# == config =======================================================================================
my %config = (
db_host => 'localhost',
db_port => 5432,
db_name => 'asterisk',
db_user => 'activecalls',
db_password => 'the db password',
db_table => 'activecalls',
db_seq => 'activecalls_acctid_seq',
db_connect_retries => 60,
db_connect_pause => 10, # seconds
 
ast_host => 'localhost',
ast_port => 5038,
ast_user => 'activecalls',
ast_password => 'the manager password',
 
delete_on_hangup => 1, # delete records in DB on channel hangup
);
 
1;
/asterisk-activecalls/activecalls.conf
0,0 → 1,20
%config = (
db_host => 'sun',
db_port => 5432,
db_name => 'asterisk',
db_user => 'activecalls',
db_password => 'X21VMTAgZwJBexHUTVzl887Odt1IK6cd',
db_table => 'activecalls',
db_seq => 'activecalls_acctid_seq',
db_connect_retries => 60,
db_connect_pause => 10, # seconds
 
ast_host => 'sip',
ast_port => 5038,
ast_user => 'activecalls',
ast_password => 'TVcIf3y6vftm6yWJsgOldFmYThvIL8Iy',
 
delete_on_hangup => 0,
);
 
1;