Skip to content

Commit

Permalink
Merge pull request #7 from Sheshuk/development
Browse files Browse the repository at this point in the history
v0.8
* Fix bugs in `SmartAlert`
* Saving detector ID for the cluster
* Remove excessive prints
  • Loading branch information
Sheshuk committed Sep 23, 2021
2 parents fbc3c95 + 5b7627b commit 8d9f126
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 78 deletions.
2 changes: 1 addition & 1 deletion snap/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version_combine__="0.7"
__version_combine__="0.8"
3 changes: 1 addition & 2 deletions snap/elements/process/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@
Processing and analyzing incoming data
========================================================================
.. autoclass:: snap.elements.process.Threshold
.. autoclass:: snap.elements.process.SmartAlert
:members:
"""
from .threshold import Threshold
from .smart_alert import SmartAlert
29 changes: 17 additions & 12 deletions snap/elements/process/smart_alert.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class SmartAlert:
"""A precessing :term:`step` for detecting the parts (clusters) of the time series above threshold,
bookkeeping of these parts and producing 'NEW/UPD/DEL' commands for these clusters.
"""
clu_id = 0
alert_id = 0
def __init__(self, threshold:float=5, timeout:float=600):
"""
Args:
Expand Down Expand Up @@ -55,13 +55,12 @@ def __init__(self, threshold:float=5, timeout:float=600):

def drop_tail(self):
t_drop = self.data.T1()-self.timeout

#drop obsolete clusters
self.clusters = [c for c in self.clusters if(c.T1()>=t_drop)]
#shift drop time to avoid cutting existing clusters
for c in self.clusters:
if t_drop in c:
t_drop = c.T0()
#drop obsolete clusters
self.clusters = [c for c in self.clusters if(c.T1()>=t_drop)]
#cut the data
self.data = self.data.drop_tail(t_drop)

Expand All @@ -70,12 +69,13 @@ async def put(self, data: DataBlock):
self.data = data
else:
self.data = self.data.update(data)
self.det_id = data.id

self.drop_tail()
clusters = find_clusters(self.data, self.thr)
res = self.update_clusters(clusters)
for method,clus in res.items():
for c in clus:
print(method,c)
await self.queue.put((method,c))

async def get(self):
Expand All @@ -102,25 +102,30 @@ def collides(d0,d1):
def maxz(d):
return d.zs.max()

for c in clusters+self.clusters:
c.det_id = self.det_id
old_clusters = sorted(self.clusters, key=maxz, reverse=True)
new_clusters = sorted(clusters, key=maxz, reverse=True)

to_upd = []
to_old = []
to_new = clusters
to_del = self.clusters
for c0 in sorted(self.clusters, key=maxz, reverse=True):
for c1 in sorted(clusters, key=maxz, reverse=True):
to_new = new_clusters
to_del = old_clusters
for c0 in list(to_del):
for c1 in list(to_new):
if collides(c0,c1):
c1.id = c0.id
to_del.remove(c0)
to_new.remove(c1)
c1.id = c0.id
if c1!=c0:
to_upd.append(c1)
else:
to_old.append(c1)
break
#set IDs to new clusters
for c in to_new:
self.clu_id+=1
c.id = self.clu_id
self.alert_id+=1
c.id = self.alert_id
self.clusters = to_old+to_new+to_upd
return {'UPD': to_upd, 'DEL': to_del, 'NEW': to_new}

52 changes: 0 additions & 52 deletions snap/elements/process/threshold.py

This file was deleted.

34 changes: 23 additions & 11 deletions test/test_smartalert.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,18 @@ async def test_cluster_rewritten_by_second_data_gives_DEL():
with pytest.raises(asyncio.TimeoutError):
await get(sa)

@pytest.mark.asyncio
async def test_cluster_repeating_same_data_gives_nothing():
sa = SmartAlert(threshold=0, timeout=600)
d0 = DataBlock(ts=np.arange(11),zs=[0,0,0,1,1,1,1,1,0,0])
await sa.put(d0)
assert await get(sa) == ("NEW",DataBlock(id=1,ts=range(3,9),zs=[1,1,1,1,1]))
with pytest.raises(asyncio.TimeoutError):
await get(sa)
await sa.put(d0)
with pytest.raises(asyncio.TimeoutError):
await get(sa)

async def dump_all(obj):
print('--beg')
while True:
Expand All @@ -69,8 +81,8 @@ async def test_two_clusters_in_one_block():
sa = SmartAlert(threshold=0, timeout=600)
d0 = DataBlock(ts=np.arange(0, 11),zs=[0,1,1,1,0,0,2,2,2,0])
await sa.put(d0)
assert await get(sa) == ("NEW",DataBlock(id=1,ts=[1,2,3,4], zs=[1,1,1]))
assert await get(sa) == ("NEW",DataBlock(id=2,ts=[6,7,8,9], zs=[2,2,2]))
assert await get(sa) == ("NEW",DataBlock(id=1,ts=[6,7,8,9], zs=[2,2,2]))
assert await get(sa) == ("NEW",DataBlock(id=2,ts=[1,2,3,4], zs=[1,1,1]))

with pytest.raises(asyncio.TimeoutError):
await get(sa)
Expand All @@ -82,11 +94,11 @@ async def test_two_clusters_merge_by_z_order():
d1 = DataBlock(ts=np.arange(0, 11),zs=[0,0,0,2,2,2,2,0,1,1])
await sa.put(d0)
await sa.put(d1)
assert await get(sa) == ("NEW",DataBlock(id=1,ts=[1,2,3,4], zs=[1,1,1]))
assert await get(sa) == ("NEW",DataBlock(id=2,ts=[6,7,8,9], zs=[2,2,2]))
assert await get(sa) == ("UPD",DataBlock(id=2,ts=[3,4,5,6,7], zs=[2,2,2,2]))
assert await get(sa) == ("DEL",DataBlock(id=1,ts=[1,2,3,4], zs=[1,1,1]))
assert await get(sa) == ("NEW",DataBlock(id=1,ts=[8,9,10], zs=[1,1]))
assert await get(sa) == ("NEW",DataBlock(id=1,ts=[6,7,8,9], zs=[2,2,2]))
assert await get(sa) == ("NEW",DataBlock(id=2,ts=[1,2,3,4], zs=[1,1,1]))
assert await get(sa) == ("UPD",DataBlock(id=1,ts=[3,4,5,6,7], zs=[2,2,2,2]))
assert await get(sa) == ("DEL",DataBlock(id=2,ts=[1,2,3,4], zs=[1,1,1]))
assert await get(sa) == ("NEW",DataBlock(id=3,ts=[8,9,10], zs=[1,1]))

with pytest.raises(asyncio.TimeoutError):
await get(sa)
Expand All @@ -98,10 +110,10 @@ async def test_two_clusters_merge_to_two():
d1 = DataBlock(ts=np.arange(0, 11),zs=[0,0,0,1,1,1,1,0,2,2])
await sa.put(d0)
await sa.put(d1)
assert await get(sa) == ("NEW",DataBlock(id=1,ts=[1,2,3,4], zs=[1,1,1]))
assert await get(sa) == ("NEW",DataBlock(id=2,ts=[6,7,8,9], zs=[2,2,2]))
assert await get(sa) == ("UPD",DataBlock(id=2,ts=[8,9,10], zs=[1,1]))
assert await get(sa) == ("UPD",DataBlock(id=1,ts=[3,4,5,6,7], zs=[1,1,1,1]))
assert await get(sa) == ("NEW",DataBlock(id=1,ts=[6,7,8,9], zs=[2,2,2]))
assert await get(sa) == ("NEW",DataBlock(id=2,ts=[1,2,3,4], zs=[1,1,1]))
assert await get(sa) == ("UPD",DataBlock(id=1,ts=[8,9,10], zs=[2,2]))
assert await get(sa) == ("UPD",DataBlock(id=2,ts=[3,4,5,6,7], zs=[1,1,1,1]))

with pytest.raises(asyncio.TimeoutError):
await get(sa)
Expand Down

0 comments on commit 8d9f126

Please sign in to comment.