panasonic-log-reader/www/index.py

253 lines
12 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# coding=utf-8
import os, sys, pymysql, re, time
from ldap3 import Server, Connection, NTLM
def application(environ, start_response):
#sys.stdout._encoding = 'utf-8'
status = '200 OK'
# проверяем наличие параметров в запросе и всяко-разно реагируем
if environ['QUERY_STRING'] == '':
output = bytes((header() + body() + footer()).encode('utf8'))
elif environ['QUERY_STRING'].split('&'):
paramDict = {item.split('=')[0]: item.split('=')[1] for item in environ['QUERY_STRING'].split('&')}
if paramDict.get('query_type') == 'internal':
output = bytes((header() + body() + getInternalNumbers() + footer()).encode('utf8'))
elif paramDict.get('query_type') == 'external':
output = bytes((header() + body() + getCOline() + footer()).encode('utf8'))
elif paramDict.get('query_type') == 'report':
output = bytes((header() + body() + ReportForm() + ReportData(environ) + footer()).encode('utf8'))
elif paramDict.get('query_type') == 'ldap':
domain = paramDict.get('domain')
group = paramDict.get('group')
output = bytes((header() + body() + getLDAPusers(domain, group) + footer()).encode('utf8'))
else:
output = bytes((header() + body() + footer()).encode('utf8'))
response_headers = [('Content-type', 'text/html;charset=utf-8'),
('Content-Length', str(len(output)))]
start_response(status, response_headers)
return [output]
def header():
txtHeader = '<html><head><meta charset="utf-8"><link rel="shortcut icon" href="favicon.png" >\n' \
'<link rel="icon" type="image/png" href="favicon.png" >\n' \
'<link href="css/layout.css" rel="stylesheet" type="text/css" />\n' \
'<title>Телефоны</title>\n' \
'<link href="css/menu.css" rel="stylesheet" type="text/css" /></head><body>\n'
return txtHeader
def footer():
txtFooter = '</body></html>\n'
return txtFooter
def body():
txtBody = '<table><tr><td><h2 align=left>ТЕЛЕФОНЫ</h2></td></tr>\n' \
'<tr><td>{}</td></tr></table>\n'.format(menu())
return txtBody
def menu():
#<div class="container">
txtMenu = '<div class="container"><ul id="nav"></li>\n' \
'<li><a class="hsubs" href="#">Справочник</a><ul class="subs">\n' \
'<li><a href="?query_type=external">Городские телефоны</a></li>\n' \
'<li><a href="?query_type=ldap&domain=domain1">Список абонентов 1</a></li>\n' \
'<li><a href="?query_type=ldap&domain=domain2">Список абонентов 2</a></li>\n' \
'<li><a href="?query_type=ldap&domain=domain3">Список абонентов 3</a></li></ul></li>\n' \
'<li><a class="hsubs" href="">Отчёты</a><ul class="subs">' \
'<li><a href="?query_type=report&report_type=int">Список звонков по номеру</a></li>\n' \
'<li><a href="?query_type=report&report_type=dep">Звонки по отделам</a></li>\n' \
'</ul></div>\n'
return txtMenu
def getInternalNumbers():
conn = connectDB()
c = conn.cursor()
order = 'int_number'
result = '<h4 align=left>Список внутренних телефонов</h4>'
qwr = "SELECT * FROM int_number ORDER by " + order
c.execute(qwr)
listHeader = '<table class="table_dark"><tr><th>№ п/п</th>\n' \
'<th><a href=index.py?query_type=internal&order=int_number>Телефон</a></th>\n' \
'<th><a href=index.py?query_type=internal&order=fio>ФИО</a></th>\n' \
'<th>Описание<th>Электро-почта</th></tr>\n'
result = result + listHeader
for row in c.fetchall():
rowData = "<tr><td>%s</td><td>%s</td><td>%s</td><td>%s</td><td>%s</td></tr>\n" % (row[0], row[1], row[2], row[4], row[3])
result = result + rowData
return (result + "</table>\n")
# Список пользователей (ФИО, телефон, почта, отдел) из LDAP (AD)
def getLDAPusers(domain, group):
order = 'int_number'
if domain == 'domain1':
head = '<p>Для внешних абонентов звонить на xxxxxx + добавочный номер</p>'
org = '1'
s = Server('192.168.1.1')
c = Connection(s, user='domain1\\phone', password="pass", authentication=NTLM)
# perform the Bind operation
if not c.bind():
print('error in bind', c.result)
c.search('OU=users,dc=domain1,dc=local', '(objectclass=person)',
attributes=['cn', 'mail', 'telephoneNumber', 'department', 'title', 'mobile'])
elif domain == 'domain2':
head = '<p>Для внешних абонентов звонить на xxxxxxx + добавочный номер</p>'
org = '2'
s = Server('192.168.2.2')
c = Connection(s, user='domain2\\phone', password="pass", authentication=NTLM)
# perform the Bind operation
if not c.bind():
print('error in bind', c.result)
c.search('OU=users,dc=domain2,dc=local', '(objectclass=person)',
attributes=['cn', 'mail', 'telephoneNumber', 'department', 'title', 'mobile'])
elif domain == 'domain3':
head = '<p>Для внешних абонентов звонить на xxxxxxx + добавочный номер</p>'
org = '3'
s = Server('192.168.3.3')
c = Connection(s, user='domain3\\phone', password="pass", authentication=NTLM)
# perform the Bind operation
if not c.bind():
print('error in bind', c.result)
c.search('OU=users,dc=domain3,dc=local', '(objectclass=person)',
attributes=['cn', 'mail', 'telephoneNumber', 'department', 'title', 'mobile'])
result = '<h4 align=left>Список абонентов %s</h4>\n%s' % (org, head)
#return (result + "</table>\n")
# listHeader = '<table class="table_dark"><tr>\n' \
# '<th><a href=index.py?query_type=internal&order=fio>ФИО</a></th>\n' \
# '<th>Отдел</th><th>Должность</th>\n' \
# '<th><a href=index.py?query_type=internal&order=int_number>Телефон</a></th>\n' \
# '<th>Эл.почта</th><th>Мобильный телефон</th></tr>\n'
listHeader = '<table class="table_dark"><tr>\n' \
'<th>ФИО</th>\n' \
'<th>Отдел</th><th>Должность</th>\n' \
'<th>Телефон</th>\n' \
'<th>Эл.почта</th><th>Мобильный телефон</th></tr>\n'
result = result + listHeader
for item in c.entries:
# print(item)
# item = re.sub('\n', '', str(item))
item = str(item)
name = re.search('(cn:)(.+?)(\n)', item)
if name:
name = name.groups()[1]
else:
name=name
#print(name.groups()[1])
department = re.search("(department:)(.+?)(\n)", item)
if department:
dep = department.groups()[1]
else:
dep = ""
title = re.search("(title:)(.+?)(\n)", item)
if title:
title = title.groups()[1]
else:
title = ""
mail = re.search("(mail:)(.+?)(\n)", item)
if mail:
mail = mail.groups()[1]
else:
mail = ""
telephone = re.search("(telephoneNumber:)(.+?)(\n)", item)
if telephone:
phone = telephone.groups()[1]
else:
phone = ""
mobile = re.search("(mobile:)(.+?)(\n)", item)
if mobile:
mobile = mobile.groups()[1]
else:
mobile = ""
rowData = "<tr><td>%s</td><td>%s</td><td>%s</td><td>%s</td><td><a href=\"mailto:%s\">%s</a></td><td>%s</td></tr>\n" % (name, dep, title, phone, mail, mail, mobile)
result = result + rowData
return (result + "</table>\n")
def getCOline():
conn = connectDB()
c = conn.cursor()
order = 'ext_co_line'
result = '<h4 align=left>Список внешних линий</h4>'
qwr = "SELECT * FROM ext_co_line ORDER by " + order
c.execute(qwr)
listHeader = '<table class="table_dark"><tr><th>№ п/п</th>\n' \
'<th>Линия</a></th>\n' \
'<th>Номер телефона</th>\n' \
'<th>Описание</th></tr>\n'
result = result + listHeader
for row in c.fetchall():
rowData = "<tr><td>%s</td><td>%s</td><td>%s</td><td>%s</td></tr>" % (row[0], row[1], row[2], row[3])
result = result + rowData
return (result + "</table>\n")
def ReportForm():
txtReportForm = '<h3 align=center>Отчёт по звонкам</h3><table><tr valign=top><td>' \
'<form><input type=hidden name="query_type" value="report">' \
'<fieldset class="report">' \
'<label>Номер<input type="text" name="int_number"></label>' \
'<label>Период с<input type="date" name="date_begin"></label>' \
'<label>По<input type="date" name="date_end"></label>' \
'</fieldset>' \
'<fieldset class="report-action">' \
'<input class="btn" type="submit" name="submit" value="Выбрать">' \
'</fieldset></form></td>'
return txtReportForm
def ReportData(environ):
paramDict = {item.split('=')[0]: item.split('=')[1] for item in environ['QUERY_STRING'].split('&')}
# проверка корректности параметров
# внутренний номер (3 цифры 100-999)
templateNumber = '(^[1-9][0-9][0-9]$)'
if re.match(templateNumber, str(paramDict.get('int_number'))) is not None:
numbers = paramDict.get('int_number')
else:
return ErrorMessage('Введите внутренний номер')
# проверка и преобразование дат в удобоваримый для субд формат
templateDate = '^(0[1-9]|[12][0-9]|3[01])[.](0[1-9]|1[012])[.](19|20)[0-9][0-9]$'
if re.match(templateDate, str(paramDict.get('date_begin'))) is not None:
date_begin = time.strftime("%Y-%m-%d", time.strptime(str(paramDict.get('date_begin')), "%d.%m.%Y"))
else:
return ErrorMessage('Неверный формат даты начала периода')
if re.match(templateDate, paramDict.get('date_end')) is not None:
date_end = time.strftime("%Y-%m-%d", time.strptime(str(paramDict.get('date_end')), "%d.%m.%Y"))
else:
return ErrorMessage('Неверный формат даты окончания периода')
conn = connectDB()
c = conn.cursor()
order = 'call_date'
result = '<td><h4 align=left>Список звонков с номера {}</h4>'.format(numbers)
qwr = "SELECT * FROM cdr where int_number=\'{}\' AND call_date BETWEEN CAST(\'{}\' AS DATE) AND CAST(\'{}\' AS DATE) LIMIT 1000".format(numbers, date_begin, date_end)
c.execute(qwr)
listHeader = '<table class="table_dark"><tr>' \
'<th>№ п/п</th><th>Дата</th><th>Время</th><th>Внут.номер</th><th>Внеш.линия</th><th>Вызываемый номер</th>' \
'<th>Ring</th><th>Длительность</th><th>АСС</th><th>Код звонка</th><th>Направление</th><tr>\n'
result = result + listHeader
for row in c.fetchall():
rowData = '<td>{}</td><td>{}</td><td>{}</td><td>{}</td><td>{}</td><td>{}</td><td>{}</td><td>{}</td><td>{}</td><td>{}</td><td>{}</td></tr>'\
.format(row[0], row[1], row[2], row[3], row[4], row[5], row[5], row[7], row[9], row[9], row[10])
result = result + rowData
return (result + "</table></td></tr></table>\n" + qwr)
def ErrorMessage(txt):
return ('<td width=100% align=center><font color=red>{}</font></td></tr></table>'.format(txt))
def connectDB():
c = pymysql.connect(
db='ats',
user='ats',
passwd='pass',
host='x.x.x.x',
charset='utf8')
return c