Subversion Repositories general

Compare Revisions

No changes between revisions

Ignore whitespace Rev 1372 → Rev 1373

/asterisk-activecalls/trunk/activecalls.pl
0,0 → 1,650
#!/usr/local/bin/perl -w
 
# TODO:
# - 'dst' is name, not number if called SIP internally (?)
 
# fields not set: amaflags, accountcode, userfield
 
# One could use directly 'Action: Status' every time when he needs list of active calls
# without intermidately database, but there is no information about linked time
# and if a channel is still 'Ring' there is no realible way to find the pair channel.
# Alternate he can use 'Action: Command\r\nCommand: core show channel ...' where full CDR info
# is attached, but it is very slow (several seconds in my test with two channels).
 
# == globals ======================================================================================
use strict;
use vars;
use subs;
use DBI;
use POE qw( Component::Client::Asterisk::Manager );
 
our %config;
my $db;
my %channels; # active channels
my %channels_from_status; # channels already running on startup
 
# =================================================================================================
sub logmsg {
my $input = shift @_;
my $msg = shift @_;
 
my $timestamp = ($input && $input->{Timestamp} ? $input->{Timestamp} : time() . " ");
 
print STDERR "$timestamp : $msg\n";
}
 
# =================================================================================================
sub db_connect {
for(my $i = 0; $i < $config{db_connect_retries}; ++$i) {
if($db) {
if($db->ping()) {
last;
}
else {
undef $db;
}
}
 
unless($db) {
logmsg(undef, "Connnect to database");
$db = DBI->connect('DBI:Pg:dbname=' . $config{db_name}
. ';host=' . $config{db_host} . ';port=' . $config{db_port},
$config{db_user}, $config{db_password});
}
 
if($db) {
last;
}
else {
sleep($config{db_connect_pause});
}
}
 
unless($db) {
die "Cannot connect to database";
}
}
 
# =================================================================================================
sub caller_id_set {
my $id = shift @_;
 
return ($id && $id ne '<unknown>' && $id ne '<Unknown>');
}
 
# =================================================================================================
sub get_timestamp {
my $input = shift @_;
 
return ($input && $input->{Timestamp} ? int($input->{Timestamp}) : time()),
}
 
# =================================================================================================
sub find_caller_id {
my $channel = shift @_;
 
my($callerId, $callerIdNum, $callerIdName);
 
$callerIdNum = $channel->{CallerIDNum} if(caller_id_set($channel->{CallerIDNum}));
$callerIdName = $channel->{CallerIDName} if(caller_id_set($channel->{CallerIDName}));
 
if($callerIdNum && $callerIdName) {
$callerId = "\"$callerIdName\" <$callerIdNum>";
}
elsif($callerIdNum) {
$callerId = $callerIdNum;
}
elsif($callerIdName) {
$callerId = "\"$callerIdName\"";
}
 
return { full => $callerId, num => $callerIdNum, name => $callerIdName };
}
 
# =================================================================================================
sub create_record {
my $channel = shift @_;
 
my(@keys, @values);
 
if($channel->{Uniqueid}) {
push(@keys, 'uniqueid');
push(@values, $channel->{Uniqueid});
}
if($channel->{Channel}) {
push(@keys, 'channel');
push(@values, $channel->{Channel});
}
if($channel->{Disposition}) {
push(@keys, 'disposition');
push(@values, $channel->{Disposition});
}
 
my $callerId = find_caller_id($channel);
if($callerId->{full}) {
push(@keys, 'clid');
push(@values, $callerId->{full});
}
if($callerId->{num}) {
push(@keys, 'src');
push(@values, $callerId->{num});
}
 
db_connect();
my $acctid_res = $db->selectrow_hashref("select nextval('" . $config{db_seq} . "')");
die "Cannot get next acctid" unless($acctid_res);
$channel->{Id} = $acctid_res->{nextval};
 
logmsg(undef, "Create a new record " . $channel->{Id} . " for channel " . $channel->{Uniqueid});
 
my $st = $db->prepare('insert into ' . $config{db_table} . ' (acctid,calldate,'
. join(',', @keys)
. ") values (?,TIMESTAMP WITH TIME ZONE 'epoch'+?*INTERVAL '1 second',"
. join(',', map('?', @values)) . ')');
die "Cannot create prepared statement" unless($st);
 
unshift(@values, $channel->{BeginTimestamp});
unshift(@values, $channel->{Id});
$st->execute(@values) || die "Cannot execute prepared statement";
$channel->{Modified} = 0;
}
 
# =================================================================================================
sub update_record {
my $channel = shift @_;
 
return unless($channel->{Id} && $channel->{Modified});
 
logmsg(undef, "Update record " . $channel->{Id} . " for channel " . $channel->{Uniqueid});
my(@keys, @values);
 
if($channel->{LinkTimestamp}) {
push(@keys, "linkdate=TIMESTAMP WITH TIME ZONE 'epoch'+?*INTERVAL '1 second'");
push(@values, $channel->{LinkTimestamp});
}
if($channel->{Disposition}) {
push(@keys, 'disposition=?');
push(@values, $channel->{Disposition});
}
if($channel->{Context}) {
push(@keys, 'dcontext=?');
push(@values, $channel->{Context});
}
if($channel->{Application}) {
push(@keys, 'lastapp=?');
push(@values, $channel->{Application});
}
if(defined($channel->{AppData})) {
push(@keys, 'lastdata=?');
push(@values, $channel->{AppData});
}
if($channel->{Dest}) {
push(@keys, 'dst=?');
push(@values, $channel->{Dest});
}
 
my $callerId = find_caller_id($channel);
if($callerId->{full}) {
push(@keys, 'clid=?');
push(@values, $callerId->{full});
}
if($callerId->{num}) {
push(@keys, 'src=?');
push(@values, $callerId->{num});
}
 
my $dest_channel = $channel->{DestChannel};
if($dest_channel) {
if($dest_channel->{Channel}) {
push(@keys, 'dstchannel=?');
push(@values, $dest_channel->{Channel});
}
}
 
if($#keys >= 0) {
db_connect();
my $st = $db->prepare('update ' . $config{db_table} . ' set ' . join(',', @keys)
. ' where acctid=?');
die "Cannot create prepared statement" unless($st);
 
push(@values, $channel->{Id});
$st->execute(@values) || die "Cannot execute prepared statement";
}
$channel->{Modified} = 0;
}
 
# =================================================================================================
sub delete_record {
my $channel = shift @_;
 
return unless($channel->{Id});
 
logmsg(undef, "Delete record " . $channel->{Id} . " for channel " . $channel->{Uniqueid});
my(@values);
 
db_connect();
my $st = $db->prepare('delete from ' . $config{db_table} . ' where acctid=?');
die "Cannot create prepared statement" unless($st);
 
push(@values, $channel->{Id});
$st->execute(@values) || die "Cannot execute prepared statement";
$channel->{Modified} = 0;
}
 
# =================================================================================================
sub close_record {
my $channel = shift @_;
 
return unless($channel->{Id});
 
logmsg(undef, "Close record " . $channel->{Id} . " for channel " . $channel->{Uniqueid});
my(@keys, @values);
 
if($channel->{Disposition}) {
push(@keys, 'disposition=?');
push(@values, $channel->{Disposition});
}
if($channel->{Application}) {
push(@keys, 'lastapp=?');
push(@values, $channel->{Application});
}
if(defined($channel->{AppData})) {
push(@keys, 'lastdata=?');
push(@values, $channel->{AppData});
}
 
push(@keys, 'duration=?');
push(@values, $channel->{EndTimestamp} - $channel->{BeginTimestamp});
 
push(@keys, 'billsec=?');
push(@values, ($channel->{LinkTimestamp}
? $channel->{EndTimestamp} - $channel->{LinkTimestamp} : 0));
 
db_connect();
my $st = $db->prepare('update ' . $config{db_table} . ' set ' . join(',', @keys)
. ' where acctid=?');
die "Cannot create prepared statement" unless($st);
 
push(@values, $channel->{Id});
$st->execute(@values) || die "Cannot execute prepared statement";
$channel->{Modified} = 0;
}
 
# =================================================================================================
sub find_record_id {
my $unique_id = shift @_;
 
my(@values);
 
db_connect();
my $st = $db->prepare('select acctid from ' . $config{db_table} . ' where uniqueid=?');
die "Cannot create prepared statement" unless($st);
 
push(@values, $unique_id);
$st->execute(@values) || die "Cannot execute prepared statement";
 
if(my @row = $st->fetchrow_array) {
logmsg(undef, "Search record id for UniqueId " . $unique_id . ": " . $row[0]);
return $row[0];
}
 
logmsg(undef, "Search record id for UniqueId " . $unique_id . ": not found");
return undef;
}
 
# =================================================================================================
sub add_channel {
my $input = shift @_;
 
my $channel = {
Uniqueid => $input->{Uniqueid},
Channel => $input->{Channel},
BeginTimestamp => get_timestamp($input),
Disposition => 'NO ANSWER',
Modified => 1,
};
$channels{$input->{Uniqueid}} = $channel;
update_channel_callerid($input);
 
logmsg($input, "Add channel " . $channel->{Uniqueid});
 
return $channel;
}
 
# =================================================================================================
sub update_channel_callerid {
my $input = shift @_;
 
my $channel = $channels{$input->{Uniqueid}};
if($channel) {
if(caller_id_set($input->{CallerIDNum})) {
$channel->{CallerIDNum} = $input->{CallerIDNum};
}
elsif(caller_id_set($input->{CallerID}) && !caller_id_set($channel->{CallerIDNum})) {
$channel->{CallerIDNum} = $input->{CallerID};
}
$channel->{CallerIDName} = $input->{CallerIDName} if($input->{CallerIDName});
$channel->{Modified} = 1;
logmsg($input, "Update channel CallerID " . $channel->{Uniqueid} . " -> "
. $channel->{CallerIDNum} . "/" . $channel->{CallerIDName});
}
 
return $channel;
}
 
# =================================================================================================
sub update_channel_dest {
my $input = shift @_;
 
logmsg($input, "Update channel dest " . $input->{SrcUniqueID} . " -> " . $input->{DestUniqueID});
my $src_channel = $channels{$input->{SrcUniqueID}};
my $dest_channel = $channels{$input->{DestUniqueID}};
if($src_channel) {
$src_channel->{DestChannel} = $dest_channel;
$src_channel->{Modified} = 1;
}
if($dest_channel) {
$dest_channel->{SrcChannel} = $src_channel;
$dest_channel->{Modified} = 1;
}
 
return $src_channel;
}
 
# =================================================================================================
sub update_channel_state {
my $input = shift @_;
 
my $channel = $channels{$input->{Uniqueid}};
if($channel) {
if($input->{State} eq 'Busy') {
$channel->{Disposition} = 'BUSY';
$channel->{Modified} = 1;
logmsg($input, "Update channel disposition " . $channel->{Uniqueid} . " -> "
. $channel->{Disposition});
}
}
 
return $channel;
}
 
# =================================================================================================
sub update_channel_link {
my $input = shift @_;
 
my $channel = $channels{$input->{Uniqueid1}};
if($channel) {
$channel->{LinkTimestamp} = get_timestamp($input);
$channel->{Disposition} = 'ANSWERED';
$channel->{Modified} = 1;
logmsg($input, "Update channel disposition " . $channel->{Uniqueid} . " -> "
. $channel->{Disposition});
}
 
return $channel;
}
 
# =================================================================================================
sub update_channel_exten {
my $input = shift @_;
 
my $channel = $channels{$input->{Uniqueid}};
if($channel) {
$channel->{Modified} = ($channel->{Application} ne $input->{Application}
|| $channel->{AppData} ne $input->{AppData});
 
$channel->{Application} = $input->{Application};
$channel->{AppData} = $input->{AppData};
 
if(!$channel->{Context}) {
$channel->{Context} = $input->{Context};
$channel->{Modified} = 1;
logmsg($input, "Update channel context " . $channel->{Uniqueid} . " -> "
. $channel->{Context});
}
if(!$channel->{Dest}) {
$channel->{Dest} = $input->{Extension};
$channel->{Modified} = 1;
}
}
 
return $channel;
}
 
# =================================================================================================
sub hangup_channel {
my $input = shift @_;
 
my $channel = $channels{$input->{Uniqueid}};
if($channel) {
$channel->{EndTimestamp} = get_timestamp($input);
$channel->{Modified} = 1;
 
my $cause = $input->{Cause};
if($cause == 0 || $cause == 16) {
# normal clearing
}
elsif($cause == 17) {
$channel->{Disposition} = 'BUSY';
}
elsif($cause == 19) {
$channel->{Disposition} = 'NO ANSWER';
}
else { # some unknown cause, should be a fail
$channel->{Disposition} = 'FAILED';
}
 
logmsg($input, "Remove channel " . $input->{Uniqueid} . ", cause " . $input->{Cause}
. " (" . $input->{'Cause-txt'} . ")");
delete $channels{$input->{Uniqueid}};
}
 
return $channel;
}
 
# =================================================================================================
sub find_running_channels {
for my $input (values %channels_from_status) {
next if($input->{Already_Used});
 
my($src, $dest, $link);
 
if($input->{Link}) {
$link = $channels_from_status{$input->{Link}};
}
elsif($input->{State} eq 'Ring') {
# channel is not linked yet, try to find the pair by unique id
my($id1, $id2) = ($input->{Uniqueid} =~ /^(\d+)\.(\d+)$/);
my $id = $id1 . "." . ($id2+1); # the next id
for my $l (values %channels_from_status) {
if($l->{Uniqueid} eq $id && !$l->{Extension} && !$l->{Already_Used}) {
$link = $l;
last;
}
}
}
else {
next;
}
 
if($input->{Extension}) {
$src = $input;
$dest = $link;
}
elsif($link && $link->{Extension}) {
$src = $link;
$dest = $input;
}
else {
$src = $input;
}
 
my $src_channel = {
Uniqueid => $src->{Uniqueid},
Channel => $src->{Channel},
BeginTimestamp => get_timestamp($input) - $src->{Seconds},
Disposition => ($src->{State} eq 'Up' ? 'ANSWERED' : 'NO ANSWER'),
Modified => 1,
};
$src_channel->{LinkTimestamp} = $src_channel->{BeginTimestamp}
if($src_channel->{Disposition} eq 'ANSWERED'); # the best guess
$channels{$src_channel->{Uniqueid}} = $src_channel;
$src->{Already_Used} = 1;
logmsg(undef, "Add running channel " . $src_channel->{Uniqueid});
 
if($dest) {
my $dest_channel = {
Uniqueid => $dest->{Uniqueid},
Channel => $dest->{Channel},
SrcChannel => $src_channel,
};
$channels{$dest_channel->{Uniqueid}} = $dest_channel;
$dest->{Already_Used} = 1;
logmsg(undef, "Add running channel " . $dest_channel->{Uniqueid});
 
$src_channel->{DestChannel} = $dest_channel;
$src_channel->{Dest} = $dest->{CallerIDNum};
}
 
update_channel_state($src);
update_channel_callerid($src);
 
$src_channel->{Id} = find_record_id($src_channel->{Uniqueid});
 
create_record($src_channel) unless($src_channel->{Id});
$src_channel->{Modified} = 1;
update_record($src_channel);
}
}
 
# =================================================================================================
sub main {
# load config
my $config_filename = $ARGV[0] || 'activecalls.conf';
require $config_filename;
# setup
my $client = POE::Component::Client::Asterisk::Manager->new(
Alias => 'activecalls',
RemoteHost => $config{ast_host},
RemotePort => $config{ast_port},
Username => $config{ast_user},
Password => $config{ast_password},
CallBacks => {
dump => ':all',
status => {
'Event' => 'Status',
},
status_complete => {
'Event' => 'StatusComplete',
},
newchannel => {
'Event' => 'Newchannel',
},
newcallerid => {
'Event' => 'Newcallerid',
},
hangup => {
'Event' => 'Hangup',
},
newstate => {
'Event' => 'Newstate',
},
dial => {
'Event' => 'Dial',
},
link => {
'Event' => 'Link',
},
newexten => {
'Event' => 'Newexten',
},
},
inline_states => {
_connected => sub {
logmsg(undef, "Logged in");
 
my ($kernel, $session, $heap) = @_[KERNEL, SESSION, HEAP];
$heap->{server}->put({'Action' => 'Status'}); # get list of running channels
},
dump => sub {
my $input = $_[ARG0];
require Data::Dumper;
print Data::Dumper->Dump([$input]);
},
status => sub {
my $input = $_[ARG0];
$input->{Timestamp} = (time() . ".000000") if(!$input->{Timestamp});
$channels_from_status{$input->{Channel}} = $input;
},
status_complete => sub {
find_running_channels();
},
newchannel => sub {
my $input = $_[ARG0];
if(my $channel = add_channel($input)) {
create_record($channel) if($input->{State} eq 'Ring');
}
},
newcallerid => sub {
my $input = $_[ARG0];
if(my $channel = update_channel_callerid($input)) {
update_record($channel);
}
},
hangup => sub {
my $input = $_[ARG0];
if(my $channel = hangup_channel($input)) {
if($config{delete_on_hangup}) {
delete_record($channel);
}
else {
close_record($channel);
}
}
},
newstate => sub {
my $input = $_[ARG0];
if(my $channel = update_channel_state($input)) {
if($channel->{Id}) {
update_record($channel);
}
elsif($input->{State} eq 'Ring') {
create_record($channel);
}
}
},
dial => sub {
my $input = $_[ARG0];
if(my $channel = update_channel_dest($input)) {
update_record($channel);
}
},
link => sub {
my $input = $_[ARG0];
if(my $channel = update_channel_link($input)) {
update_record($channel);
}
},
newexten => sub {
my $input = $_[ARG0];
if(my $channel = update_channel_exten($input)) {
update_record($channel);
}
},
},
);
 
# listen
logmsg(undef, "Listen...");
$poe_kernel->run();
}
 
# =================================================================================================
main();
 
Property changes:
Added: svn:executable