EMMA Coverage Report (generated Tue Jul 10 07:50:22 IST 2012)
[all classes][org.wso2.siddhi.core.stream.packer.join]

COVERAGE SUMMARY FOR SOURCE FILE [JoinStreamPacker.java]

nameclass, %method, %block, %line, %
JoinStreamPacker.java100% (1/1)83%  (5/6)29%  (62/215)39%  (20.5/52)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class JoinStreamPacker100% (1/1)83%  (5/6)29%  (62/215)39%  (20.5/52)
sendEventList (List): void 0%   (0/1)0%   (0/22)0%   (0/6)
process (ComplexEvent): void 100% (1/1)24%  (41/172)31%  (11.5/37)
JoinStreamPacker (ConditionExecutor, boolean): void 100% (1/1)100% (9/9)100% (4/4)
getWindow (): SchedulerQueue 100% (1/1)100% (4/4)100% (1/1)
setNext (QueryProjector): void 100% (1/1)100% (4/4)100% (2/2)
setOppositeWindow (SchedulerQueue): void 100% (1/1)100% (4/4)100% (2/2)

1/*
2*  Copyright (c) 2005-2010, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
3*
4*  WSO2 Inc. licenses this file to you under the Apache License,
5*  Version 2.0 (the "License"); you may not use this file except
6*  in compliance with the License.
7*  You may obtain a copy of the License at
8*
9*    http://www.apache.org/licenses/LICENSE-2.0
10*
11* Unless required by applicable law or agreed to in writing,
12* software distributed under the License is distributed on an
13* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14* KIND, either express or implied.  See the License for the
15* specific language governing permissions and limitations
16* under the License.
17*/
18package org.wso2.siddhi.core.stream.packer.join;
19 
20import org.wso2.siddhi.core.event.AtomicEvent;
21import org.wso2.siddhi.core.event.ComplexEvent;
22import org.wso2.siddhi.core.event.Event;
23import org.wso2.siddhi.core.event.ListEvent;
24import org.wso2.siddhi.core.event.StreamEvent;
25import org.wso2.siddhi.core.event.in.InStream;
26import org.wso2.siddhi.core.event.in.StateEvent;
27import org.wso2.siddhi.core.executor.conditon.ConditionExecutor;
28import org.wso2.siddhi.core.util.SchedulerQueue;
29import org.wso2.siddhi.core.projector.QueryProjector;
30import org.wso2.siddhi.core.stream.packer.SingleStreamPacker;
31 
32import java.util.ArrayList;
33import java.util.Iterator;
34import java.util.List;
35 
36public abstract class JoinStreamPacker extends SingleStreamPacker {
37 
38    public SchedulerQueue<StreamEvent> getWindow() {
39        return prevStreamElement.getWindow();
40    }
41 
42    protected SchedulerQueue<StreamEvent> window;
43    protected ConditionExecutor onConditionExecutor;
44    protected boolean triggerEvent;
45    //    private int nextState;
46 
47 
48    public JoinStreamPacker(ConditionExecutor onConditionExecutor, boolean triggerEvent) {
49        this.onConditionExecutor = onConditionExecutor;
50        this.triggerEvent=triggerEvent;
51    }
52 
53    @Override
54    public void setNext(QueryProjector queryProjector) {
55        this.queryProjector = queryProjector;
56//        this.nextState = state;
57    }
58 
59    //written in thinking of LEFT
60    // RIGHT is handled in the #createNewEvent()
61    @Override
62    public void process(ComplexEvent complexEvent) {
63//        //System.out.println("Arrived");
64 
65        if (triggerEvent && complexEvent instanceof InStream) {
66            if (complexEvent instanceof Event) {
67                Iterator<StreamEvent> iterator = window.iterator();
68                while (iterator.hasNext()) {
69                    ComplexEvent windowComplexEvent = iterator.next();
70                    if (windowComplexEvent instanceof Event) {
71//                        Event newEvent = (new InComplexEvent(new Event[]{((Event) complexEvent), ((Event) windowComplexEvent)}));
72                        StateEvent newEvent = createNewEvent(complexEvent, windowComplexEvent);
73                        if (onConditionExecutor.execute(newEvent)) {
74                            queryProjector.process(newEvent);
75                        }
76                    } else if (windowComplexEvent instanceof ListEvent) {
77                        List<AtomicEvent> list = new ArrayList<AtomicEvent>();
78                        Event[] events = ((ListEvent) windowComplexEvent).getEvents();
79                        for (int i = 0, eventsLength = events.length; i < eventsLength; i++) {
80//                            Event newEvent = (new InComplexEvent(new Event[]{((Event) complexEvent), ((Event) events[i])}));
81                            StateEvent newEvent = createNewEvent(complexEvent, events[i]);
82                            if (onConditionExecutor.execute(newEvent)) {
83                                list.add(newEvent);
84                            }
85                        }
86                        sendEventList(list);
87                    } else {
88                        //todo error Complex event not supported
89                    }
90                }
91            } else if (complexEvent instanceof ListEvent) {
92                List<AtomicEvent> list = new ArrayList<AtomicEvent>();
93                Iterator<StreamEvent> iterator = window.iterator();
94                for (Event event : ((ListEvent) complexEvent).getEvents()) {
95                    while (iterator.hasNext()) {
96                        ComplexEvent windowComplexEvent = iterator.next();
97                        if (windowComplexEvent instanceof Event) {
98//                            Event newEvent = (new InComplexEvent(new Event[]{((Event) event), ((Event) windowComplexEvent)}));
99                            StateEvent newEvent = createNewEvent(event, windowComplexEvent);
100                            if (onConditionExecutor.execute(newEvent)) {
101                                list.add(newEvent);
102                            }
103                        } else if (windowComplexEvent instanceof ListEvent) {
104                            Event[] events = ((ListEvent) windowComplexEvent).getEvents();
105                            for (int i = 0, eventsLength = events.length; i < eventsLength; i++) {
106                                StateEvent newEvent = createNewEvent(event, events[i]);
107                                if (onConditionExecutor.execute(newEvent)) {
108                                    list.add(newEvent);
109                                }
110                            }
111                        } else {
112                            //todo error Complex event not supported
113                        }
114                    }
115                }
116                sendEventList(list);
117            }
118        }
119 
120    }
121 
122    protected abstract StateEvent createNewEvent(ComplexEvent complexEvent,
123                                                 ComplexEvent complexEvent1);
124 
125    protected void sendEventList(List<AtomicEvent> list) {
126        int size = list.size();
127//        if (size > 1) {
128//            queryProjector.process((list.toArray(new AtomicEvent[list.size()])));
129//        } else if (size == 1) {
130//            queryProjector.process(list.get(0));
131//        }
132        if (size > 1) {
133            queryProjector.process(list);
134        } else if (size == 1) {
135            queryProjector.process(list.get(0));
136        }
137    }
138 
139    public void setOppositeWindow(SchedulerQueue<StreamEvent> window) {
140        this.window = window;
141    }
142 
143}

[all classes][org.wso2.siddhi.core.stream.packer.join]
EMMA 2.1.5320 (stable) (C) Vladimir Roubtsov