EMMA Coverage Report (generated Tue Jul 10 07:50:22 IST 2012)
[all classes][org.wso2.siddhi.core.stream.handler.window]

COVERAGE SUMMARY FOR SOURCE FILE [TimeBatchWindowHandler.java]

nameclass, %method, %block, %line, %
TimeBatchWindowHandler.java100% (1/1)100% (6/6)57%  (169/298)70%  (40/57)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class TimeBatchWindowHandler100% (1/1)100% (6/6)57%  (169/298)70%  (40/57)
process (ComplexEvent): void 100% (1/1)27%  (17/64)50%  (5/10)
run (): void 100% (1/1)62%  (78/126)73%  (19/26)
sendRemoveEvents (List): void 100% (1/1)64%  (23/36)71%  (5/7)
sendInEvents (List): void 100% (1/1)65%  (22/34)71%  (5/7)
setParameters (Object []): void 100% (1/1)70%  (21/30)80%  (4/5)
TimeBatchWindowHandler (): void 100% (1/1)100% (8/8)100% (2/2)

1/*
2*  Copyright (c) 2005-2010, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
3*
4*  WSO2 Inc. licenses this file to you under the Apache License,
5*  Version 2.0 (the "License"); you may not use this file except
6*  in compliance with the License.
7*  You may obtain a copy of the License at
8*
9*    http://www.apache.org/licenses/LICENSE-2.0
10*
11* Unless required by applicable law or agreed to in writing,
12* software distributed under the License is distributed on an
13* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14* KIND, either express or implied.  See the License for the
15* specific language governing permissions and limitations
16* under the License.
17*/
18package org.wso2.siddhi.core.stream.handler.window;
19 
20import org.wso2.siddhi.core.event.ComplexEvent;
21import org.wso2.siddhi.core.event.Event;
22import org.wso2.siddhi.core.event.ListEvent;
23import org.wso2.siddhi.core.event.StreamEvent;
24import org.wso2.siddhi.core.event.in.InEvent;
25import org.wso2.siddhi.core.event.in.InListEvent;
26import org.wso2.siddhi.core.event.in.InStream;
27import org.wso2.siddhi.core.event.remove.RemoveEvent;
28import org.wso2.siddhi.core.event.remove.RemoveListEvent;
29import org.wso2.siddhi.core.event.remove.RemoveStream;
30 
31import java.util.ArrayList;
32import java.util.Collections;
33import java.util.List;
34import java.util.concurrent.TimeUnit;
35 
36public class TimeBatchWindowHandler extends WindowHandler implements Runnable {
37 
38    long timeToKeep;
39    List<Event> newEventList = new ArrayList<Event>();
40    List<Event> oldEventList;
41 
42    @Override
43    public void setParameters(Object[] parameters) {
44        if (parameters[0] instanceof Integer) {
45            timeToKeep = (Integer) parameters[0];
46        } else {
47            timeToKeep = (Long) parameters[0];
48        }
49        getEventRemoverScheduler().schedule(this, timeToKeep, TimeUnit.MILLISECONDS);
50    }
51 
52    @Override
53    public void process(ComplexEvent complexEvent) {
54        if (complexEvent instanceof StreamEvent) {
55            if (complexEvent instanceof InStream) {
56                if (complexEvent instanceof Event) {
57                    newEventList.add(((Event) complexEvent));
58                } else {//ListEvent
59                    Collections.addAll(newEventList, ((ListEvent) complexEvent).getEvents());
60                }
61            } else {
62                if (complexEvent instanceof Event) {
63                    newEventList.add(new InEvent((Event) complexEvent));
64                } else {//ListEvent
65                    for (Event aEvent : ((ListEvent) complexEvent).getEvents()) {
66                        newEventList.add(new InEvent(aEvent));
67                    }
68                }
69            }
70        }
71    }
72 
73    @Override
74    public void run() {
75        oldEventList = new ArrayList<Event>();
76        while (true) {
77            StreamEvent event = getWindow().poll();
78            if (event == null) {
79                getEventRemoverScheduler().schedule(this, timeToKeep, TimeUnit.MILLISECONDS);
80 
81                sendRemoveEvents(oldEventList);
82 
83                oldEventList = newEventList;
84                newEventList = new ArrayList<Event>();
85 
86                sendInEvents(oldEventList);
87 
88                for (Event aEvent : oldEventList) {
89                    try {
90                        getWindow().put(aEvent);
91                    } catch (InterruptedException e) {
92                        e.printStackTrace();
93                    }
94                }
95                oldEventList = null;
96 
97                break;
98            }
99            if (event instanceof RemoveStream) {
100                if (event instanceof Event) {
101                    oldEventList.add(((Event) event));
102                } else {//ListEvent
103                    Collections.addAll(oldEventList, ((ListEvent) event).getEvents());
104                }
105            } else {
106                if (event instanceof Event) {
107                    oldEventList.add(new RemoveEvent((Event) event,System.currentTimeMillis()));
108                } else {//ListEvent
109                    for (Event aEvent : ((ListEvent) event).getEvents()) {
110                        oldEventList.add(new RemoveEvent(aEvent,System.currentTimeMillis()));
111                    }
112                }
113            }
114        }
115    }
116 
117    private void sendInEvents(List<Event> eventList) {
118        int size = eventList.size();
119        if (size > 1) {
120            getNextPreStreamFlowProcessor().process(new InListEvent((eventList.toArray(new Event[size]))));
121        } else if (size == 1) {
122            Event aEvent = eventList.get(0);
123            getNextPreStreamFlowProcessor().process(new InEvent(aEvent));
124        }
125    }
126 
127 
128    private void sendRemoveEvents(List<Event> eventList) {
129        int size = eventList.size();
130        if (size > 1) {
131            getNextPreStreamFlowProcessor().process(new RemoveListEvent((eventList.toArray(new Event[size])), System.currentTimeMillis()));
132        } else if (size == 1) {
133            Event aEvent = eventList.get(0);
134            getNextPreStreamFlowProcessor().process(new RemoveEvent(aEvent, System.currentTimeMillis()));
135        }
136    }
137 
138 
139}

[all classes][org.wso2.siddhi.core.stream.handler.window]
EMMA 2.1.5320 (stable) (C) Vladimir Roubtsov