Fixed some issues, store dates in utc.
This commit is contained in:
parent
6230d8392c
commit
4c010f2174
46
collector.py
46
collector.py
@ -1,17 +1,20 @@
|
|||||||
|
import logging
|
||||||
|
import signal
|
||||||
|
from datetime import datetime, timedelta
|
||||||
|
from threading import Event
|
||||||
|
|
||||||
|
from apscheduler.schedulers.blocking import BlockingScheduler
|
||||||
|
|
||||||
import config
|
import config
|
||||||
import database
|
import database
|
||||||
import signal
|
|
||||||
from apscheduler.schedulers.blocking import BlockingScheduler
|
|
||||||
from threading import Event
|
|
||||||
from plugins.system.cpu_plugin import CpuPlugin
|
|
||||||
from plugins.system.memory_plugin import MemoryPlugin
|
|
||||||
from plugins.system.disk_plugin import DiskIOPlugin, DiskUsagePlugin
|
|
||||||
from plugins.system.network_plugin import NetworkPlugin
|
|
||||||
from plugins.system.temperatures_plugin import TemperaturesPlugin
|
|
||||||
from plugins.system.ping_plugin import PingPlugin
|
|
||||||
from plugins.finance.stocks_plugin import StocksPlugin
|
|
||||||
from plugins.finance.robor_plugin import RoborPlugin
|
from plugins.finance.robor_plugin import RoborPlugin
|
||||||
import logging
|
from plugins.finance.stocks_plugin import StocksPlugin
|
||||||
|
from plugins.system.cpu_plugin import CpuPlugin
|
||||||
|
from plugins.system.disk_plugin import DiskIOPlugin, DiskUsagePlugin
|
||||||
|
from plugins.system.memory_plugin import MemoryPlugin
|
||||||
|
from plugins.system.network_plugin import NetworkPlugin
|
||||||
|
from plugins.system.ping_plugin import PingPlugin
|
||||||
|
from plugins.system.temperatures_plugin import TemperaturesPlugin
|
||||||
|
|
||||||
|
|
||||||
class Collector(object):
|
class Collector(object):
|
||||||
@ -41,8 +44,11 @@ class Collector(object):
|
|||||||
return models
|
return models
|
||||||
|
|
||||||
def schedule_plugins(self):
|
def schedule_plugins(self):
|
||||||
|
start_date = datetime.now() + timedelta(seconds=10)
|
||||||
for plugin in self.plugins:
|
for plugin in self.plugins:
|
||||||
self.scheduler.add_job(plugin.execute, 'interval', seconds=plugin.get_interval())
|
self.scheduler.add_job(plugin.execute, 'interval',
|
||||||
|
seconds=plugin.get_interval(),
|
||||||
|
start_date=start_date)
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
logging.basicConfig()
|
logging.basicConfig()
|
||||||
@ -53,22 +59,18 @@ class Collector(object):
|
|||||||
database.DB.create_tables(models)
|
database.DB.create_tables(models)
|
||||||
|
|
||||||
self.schedule_plugins()
|
self.schedule_plugins()
|
||||||
# signal.signal(signal.SIGHUP, self.abort)
|
logging.info('Started.')
|
||||||
# signal.signal(signal.SIGTERM, self.abort)
|
|
||||||
# signal.signal(signal.SIGINT, self.abort)
|
|
||||||
print(f'Started.')
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self.scheduler.start()
|
self.scheduler.start()
|
||||||
except (KeyboardInterrupt, SystemExit):
|
except (KeyboardInterrupt, SystemExit):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
print(f'Stopped.')
|
logging.info(f'Stopped.')
|
||||||
|
|
||||||
def abort(self, signum, frame):
|
|
||||||
print(f'Received signal {signum}, aborting...')
|
|
||||||
self.event.set()
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
Collector().run()
|
try:
|
||||||
|
Collector().run()
|
||||||
|
except BaseException as ex:
|
||||||
|
logging.critical("Unhandled exception.", exc_info=ex)
|
||||||
|
@ -19,7 +19,7 @@ DEFAULT_INTERVAL = 30
|
|||||||
# will create a PooledMySQLDatabase instance for the local MySQL database my_db with max_connections set to 20 and a
|
# will create a PooledMySQLDatabase instance for the local MySQL database my_db with max_connections set to 20 and a
|
||||||
# stale_timeout setting of 300 seconds.
|
# stale_timeout setting of 300 seconds.
|
||||||
|
|
||||||
DATABASE_URL = 'sqlite:///data.db'
|
DATABASE_URL = 'postgresql://system_metrics_collector:theMetrixWriteer2123@localhost:5432/system_metrics'
|
||||||
|
|
||||||
|
|
||||||
# Plugin configuration
|
# Plugin configuration
|
||||||
@ -55,7 +55,8 @@ PING_HOSTS = [
|
|||||||
'192.168.0.1',
|
'192.168.0.1',
|
||||||
'1.1.1.1',
|
'1.1.1.1',
|
||||||
'google.com',
|
'google.com',
|
||||||
'bing.com'
|
'bing.com',
|
||||||
|
'tibich.com'
|
||||||
]
|
]
|
||||||
|
|
||||||
### Stocks
|
### Stocks
|
||||||
|
@ -11,7 +11,7 @@ from plugins.plugin import Plugin
|
|||||||
|
|
||||||
|
|
||||||
class Robor(BaseModel):
|
class Robor(BaseModel):
|
||||||
date = DateField(index=True, default=datetime.date.today(), null=False)
|
date = DateField(index=True, default=datetime.date.today, null=False)
|
||||||
field = TextField(null=False)
|
field = TextField(null=False)
|
||||||
value = FloatField(null=False)
|
value = FloatField(null=False)
|
||||||
|
|
||||||
|
@ -10,7 +10,7 @@ import yfinance as yf
|
|||||||
|
|
||||||
|
|
||||||
class Stocks(BaseModel):
|
class Stocks(BaseModel):
|
||||||
date = DateTimeField(index=True, default=datetime.datetime.now(), null=False)
|
date = DateTimeField(index=True, default=datetime.datetime.utcnow, null=False)
|
||||||
ticker = TextField(null=False)
|
ticker = TextField(null=False)
|
||||||
label = TextField(null=False)
|
label = TextField(null=False)
|
||||||
value_open = FloatField(null=False)
|
value_open = FloatField(null=False)
|
||||||
@ -29,6 +29,7 @@ class StocksPlugin(Plugin):
|
|||||||
for ticker, label in config.STOCKS_TICKERS.items():
|
for ticker, label in config.STOCKS_TICKERS.items():
|
||||||
# Get last existing date
|
# Get last existing date
|
||||||
latest_date = Stocks.select(Stocks.date) \
|
latest_date = Stocks.select(Stocks.date) \
|
||||||
|
.where(Stocks.ticker == ticker) \
|
||||||
.order_by(Stocks.date.desc()) \
|
.order_by(Stocks.date.desc()) \
|
||||||
.limit(1) \
|
.limit(1) \
|
||||||
.scalar()
|
.scalar()
|
||||||
@ -51,6 +52,6 @@ class StocksPlugin(Plugin):
|
|||||||
entry.value_high = row.High
|
entry.value_high = row.High
|
||||||
entry.value_low = row.Low
|
entry.value_low = row.Low
|
||||||
entry.save()
|
entry.save()
|
||||||
print(model_to_dict(entry))
|
|
||||||
except BaseException as e:
|
except BaseException as e:
|
||||||
print(e)
|
print(e)
|
@ -10,7 +10,7 @@ from plugins.plugin import Plugin
|
|||||||
|
|
||||||
|
|
||||||
class Cpu(BaseModel):
|
class Cpu(BaseModel):
|
||||||
time = DateTimeField(index=True, default=datetime.now)
|
time = DateTimeField(index=True, default=datetime.utcnow)
|
||||||
cpu = SmallIntegerField(null=True)
|
cpu = SmallIntegerField(null=True)
|
||||||
idle_pct = FloatField(null=False)
|
idle_pct = FloatField(null=False)
|
||||||
user_pct = FloatField(null=False)
|
user_pct = FloatField(null=False)
|
||||||
|
@ -11,7 +11,7 @@ from plugins.plugin import Plugin
|
|||||||
|
|
||||||
|
|
||||||
class DiskUsage(BaseModel):
|
class DiskUsage(BaseModel):
|
||||||
time = DateTimeField(index=True, default=datetime.now)
|
time = DateTimeField(index=True, default=datetime.utcnow)
|
||||||
partition = TextField(null=False)
|
partition = TextField(null=False)
|
||||||
mountpoint = TextField(null=False)
|
mountpoint = TextField(null=False)
|
||||||
total = BigIntegerField(null=False)
|
total = BigIntegerField(null=False)
|
||||||
@ -20,7 +20,7 @@ class DiskUsage(BaseModel):
|
|||||||
|
|
||||||
|
|
||||||
class DiskIO(BaseModel):
|
class DiskIO(BaseModel):
|
||||||
time = DateTimeField(index=True, default=datetime.now)
|
time = DateTimeField(index=True, default=datetime.utcnow)
|
||||||
disk = TextField(null=True)
|
disk = TextField(null=True)
|
||||||
read_count = FloatField(null=False) # all values are per second
|
read_count = FloatField(null=False) # all values are per second
|
||||||
write_count = FloatField(null=False)
|
write_count = FloatField(null=False)
|
||||||
|
@ -9,7 +9,7 @@ from plugins.plugin import Plugin
|
|||||||
|
|
||||||
|
|
||||||
class Memory(BaseModel):
|
class Memory(BaseModel):
|
||||||
time = DateTimeField(index=True, default=datetime.now)
|
time = DateTimeField(index=True, default=datetime.utcnow)
|
||||||
total = BigIntegerField(null=False)
|
total = BigIntegerField(null=False)
|
||||||
available = BigIntegerField(null=False)
|
available = BigIntegerField(null=False)
|
||||||
used = BigIntegerField(null=False)
|
used = BigIntegerField(null=False)
|
||||||
|
@ -11,7 +11,7 @@ from plugins.plugin import Plugin
|
|||||||
|
|
||||||
|
|
||||||
class NetworkIO(BaseModel):
|
class NetworkIO(BaseModel):
|
||||||
time = DateTimeField(index=True, default=datetime.now)
|
time = DateTimeField(index=True, default=datetime.utcnow)
|
||||||
nic = TextField(null=True)
|
nic = TextField(null=True)
|
||||||
packets_sent = FloatField(null=False) # all values are per second
|
packets_sent = FloatField(null=False) # all values are per second
|
||||||
packets_recv = FloatField(null=False)
|
packets_recv = FloatField(null=False)
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
import asyncio
|
import subprocess
|
||||||
import re
|
import re
|
||||||
import subprocess
|
import subprocess
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
@ -13,7 +13,7 @@ from plugins.plugin import Plugin
|
|||||||
|
|
||||||
|
|
||||||
class Ping(BaseModel):
|
class Ping(BaseModel):
|
||||||
time = DateTimeField(index=True, default=datetime.now)
|
time = DateTimeField(index=True, default=datetime.utcnow)
|
||||||
host = TextField(null=False)
|
host = TextField(null=False)
|
||||||
ping = FloatField(null=True) # null = timeout or error
|
ping = FloatField(null=True) # null = timeout or error
|
||||||
|
|
||||||
@ -22,18 +22,15 @@ class PingPlugin(Plugin):
|
|||||||
models = [Ping]
|
models = [Ping]
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.__timeout = config.PING_INTERVAL // 3
|
self.__timeout = 20
|
||||||
|
|
||||||
def get_interval(self):
|
def get_interval(self):
|
||||||
return config.PING_INTERVAL
|
return config.PING_INTERVAL
|
||||||
|
|
||||||
async def do_ping(self, host):
|
def do_ping(self, host):
|
||||||
command = ['ping', '-c', '1', '-W', str(self.__timeout), host]
|
command = ['ping', '-c', '1', '-W', str(self.__timeout), host]
|
||||||
proc = await asyncio.create_subprocess_shell(' '.join(command),
|
proc = subprocess.run(command, stdout=subprocess.PIPE)
|
||||||
stdout=asyncio.subprocess.PIPE)
|
stdout = proc.stdout.decode()
|
||||||
|
|
||||||
stdout,_ = await proc.communicate()
|
|
||||||
stdout = stdout.decode()
|
|
||||||
|
|
||||||
entry = Ping()
|
entry = Ping()
|
||||||
entry.host = host
|
entry.host = host
|
||||||
@ -45,16 +42,6 @@ class PingPlugin(Plugin):
|
|||||||
|
|
||||||
entry.save()
|
entry.save()
|
||||||
|
|
||||||
async def execute_internal(self):
|
|
||||||
await asyncio.gather(*[self.do_ping(host) for host in config.PING_HOSTS])
|
|
||||||
|
|
||||||
def execute(self):
|
def execute(self):
|
||||||
if getattr(asyncio, 'run', None) is not None:
|
for host in config.PING_HOSTS:
|
||||||
# Python 3.7+
|
self.do_ping(host)
|
||||||
asyncio.run(self.execute_internal())
|
|
||||||
else:
|
|
||||||
loop = asyncio.get_event_loop()
|
|
||||||
loop.run_until_complete(self.execute_internal())
|
|
||||||
loop.close()
|
|
||||||
|
|
||||||
|
|
||||||
|
@ -11,7 +11,7 @@ from plugins.plugin import Plugin
|
|||||||
|
|
||||||
|
|
||||||
class Temperatures(BaseModel):
|
class Temperatures(BaseModel):
|
||||||
time = DateTimeField(index=True, default=datetime.now)
|
time = DateTimeField(index=True, default=datetime.utcnow)
|
||||||
sensor = TextField(null=False)
|
sensor = TextField(null=False)
|
||||||
sensor_label = TextField(null=False)
|
sensor_label = TextField(null=False)
|
||||||
current = FloatField(null=False) # all values are per second
|
current = FloatField(null=False) # all values are per second
|
||||||
|
@ -1,6 +1,9 @@
|
|||||||
apscheduler
|
apscheduler
|
||||||
peewee
|
peewee
|
||||||
|
|
||||||
|
# used for databases
|
||||||
|
psycopg2-binary
|
||||||
|
|
||||||
# Used in most system plugins
|
# Used in most system plugins
|
||||||
psutil
|
psutil
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user