EMMA Coverage Report (generated Tue Jul 10 07:50:22 IST 2012)
[all classes][org.wso2.siddhi.core.stream.output]

COVERAGE SUMMARY FOR SOURCE FILE [Callback.java]

nameclass, %method, %block, %line, %
Callback.java100% (1/1)88%  (7/8)84%  (223/264)84%  (38/45)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class Callback100% (1/1)88%  (7/8)84%  (223/264)84%  (38/45)
getStreamId (): String 0%   (0/1)0%   (0/3)0%   (0/1)
toString (long, Object [], Object [], Object []): String 100% (1/1)82%  (134/163)88%  (15/17)
run (): void 100% (1/1)86%  (54/63)76%  (13/17)
Callback (): void 100% (1/1)100% (8/8)100% (2/2)
getData (Object [], int, int): Object 100% (1/1)100% (8/8)100% (1/1)
receive (StreamEvent): void 100% (1/1)100% (11/11)100% (3/3)
setStreamId (String): void 100% (1/1)100% (4/4)100% (2/2)
setThreadPoolExecutor (ThreadPoolExecutor): void 100% (1/1)100% (4/4)100% (2/2)

1/*
2*  Copyright (c) 2005-2010, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
3*
4*  WSO2 Inc. licenses this file to you under the Apache License,
5*  Version 2.0 (the "License"); you may not use this file except
6*  in compliance with the License.
7*  You may obtain a copy of the License at
8*
9*    http://www.apache.org/licenses/LICENSE-2.0
10*
11* Unless required by applicable law or agreed to in writing,
12* software distributed under the License is distributed on an
13* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14* KIND, either express or implied.  See the License for the
15* specific language governing permissions and limitations
16* under the License.
17*/
18package org.wso2.siddhi.core.stream.output;
19 
20import org.wso2.siddhi.core.event.Event;
21import org.wso2.siddhi.core.event.StreamEvent;
22import org.wso2.siddhi.core.event.in.InStream;
23import org.wso2.siddhi.core.event.remove.RemoveStream;
24import org.wso2.siddhi.core.stream.recevier.StreamReceiver;
25import org.wso2.siddhi.core.util.SchedulerQueue;
26 
27import java.util.Arrays;
28import java.util.concurrent.ThreadPoolExecutor;
29 
30public abstract class Callback implements Runnable, StreamReceiver {
31 
32    private SchedulerQueue<StreamEvent> inputQueue = new SchedulerQueue<StreamEvent>();
33    private ThreadPoolExecutor threadPoolExecutor;
34    private String streamId;
35 
36    public void setThreadPoolExecutor(ThreadPoolExecutor threadPoolExecutor) {
37        this.threadPoolExecutor = threadPoolExecutor;
38    }
39 
40    public String toString(long timeStamp, Object[] newEventData, Object[] removeEventData,
41                           Object[] faultEventData) {
42        String value = "Events{" +
43                       " @timeStamp=" + timeStamp +
44                       ", newEventData=";
45        if (newEventData == null) {
46            value += null;
47        } else {
48            for (Object data : newEventData) {
49                value += Arrays.asList((Object[]) data).toString();
50            }
51        }
52        value += ", removeEventData=";
53        if (removeEventData == null) {
54            value += null;
55        } else {
56            for (Object data : removeEventData) {
57                value += Arrays.asList((Object[]) data).toString();
58            }
59        }
60        value += ", faultEventData=";
61        if (faultEventData == null) {
62            value += null;
63        } else {
64            for (Object data : faultEventData) {
65                value += Arrays.asList((Object[]) data).toString();
66            }
67        }
68        value += '}';
69        return value;
70    }
71 
72    public Object getData(Object[] eventData, int eventPosition, int dataPosition) {
73        return ((Object[]) eventData[eventPosition])[dataPosition];
74    }
75 
76    public void receive(StreamEvent event) throws InterruptedException {
77        if (!inputQueue.put(event)) {
78            threadPoolExecutor.submit(this);
79        }
80    }
81 
82    @Override
83    public void run() {
84        int eventCounter = 0;
85        while (true) {
86            StreamEvent event = inputQueue.poll();
87            if (event == null) {
88                break;
89            } else if (eventCounter > 10) {
90                threadPoolExecutor.submit(this);
91                break;
92            }
93            if (event instanceof Event) {
94                try {
95                    if (event instanceof InStream) {
96                        receive(event.getTimeStamp(), new Object[]{((Event) event).getData()}, null, null);
97                    } else if (event instanceof RemoveStream) {
98                        receive(event.getTimeStamp(), null, new Object[]{((Event) event).getData()}, null);
99                    }
100                } catch (Throwable e) {
101                    e.printStackTrace();
102                }
103            }
104        }
105    }
106 
107    public abstract void receive(long timeStamp, Object[] newEventData,
108                                 Object[] removeEventData, Object[] faultEventData);
109 
110    public void setStreamId(String streamId) {
111        this.streamId = streamId;
112    }
113 
114    public String getStreamId() {
115        return streamId;
116    }
117}

[all classes][org.wso2.siddhi.core.stream.output]
EMMA 2.1.5320 (stable) (C) Vladimir Roubtsov