| 1 | /* |
| 2 | * Copyright (c) 2005-2010, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. |
| 3 | * |
| 4 | * WSO2 Inc. licenses this file to you under the Apache License, |
| 5 | * Version 2.0 (the "License"); you may not use this file except |
| 6 | * in compliance with the License. |
| 7 | * You may obtain a copy of the License at |
| 8 | * |
| 9 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 10 | * |
| 11 | * Unless required by applicable law or agreed to in writing, |
| 12 | * software distributed under the License is distributed on an |
| 13 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| 14 | * KIND, either express or implied. See the License for the |
| 15 | * specific language governing permissions and limitations |
| 16 | * under the License. |
| 17 | */ |
| 18 | package org.wso2.siddhi.core; |
| 19 | |
| 20 | import org.wso2.siddhi.core.event.in.StateEvent; |
| 21 | import org.wso2.siddhi.core.exception.EventStreamAlreadyExistException; |
| 22 | import org.wso2.siddhi.core.projector.QueryProjector; |
| 23 | import org.wso2.siddhi.core.stream.StreamJunction; |
| 24 | import org.wso2.siddhi.core.stream.output.Callback; |
| 25 | import org.wso2.siddhi.core.stream.input.InputHandler; |
| 26 | import org.wso2.siddhi.core.stream.recevier.StreamReceiver; |
| 27 | import org.wso2.siddhi.core.util.parser.StreamParser; |
| 28 | import org.wso2.siddhi.query.api.ExecutionPlan; |
| 29 | import org.wso2.siddhi.query.api.definition.Attribute; |
| 30 | import org.wso2.siddhi.query.api.definition.StreamDefinition; |
| 31 | import org.wso2.siddhi.query.api.expression.Expression; |
| 32 | import org.wso2.siddhi.query.api.query.Query; |
| 33 | import org.wso2.siddhi.query.api.query.QueryEventStream; |
| 34 | import org.wso2.siddhi.query.api.query.projection.attribute.OutputAttribute; |
| 35 | import org.wso2.siddhi.query.api.query.projection.attribute.SimpleAttribute; |
| 36 | import org.wso2.siddhi.query.compiler.SiddhiCompiler; |
| 37 | import org.wso2.siddhi.query.compiler.exception.SiddhiPraserException; |
| 38 | |
| 39 | import java.util.ArrayList; |
| 40 | import java.util.HashMap; |
| 41 | import java.util.List; |
| 42 | import java.util.Map; |
| 43 | import java.util.concurrent.LinkedBlockingQueue; |
| 44 | import java.util.concurrent.ThreadPoolExecutor; |
| 45 | import java.util.concurrent.TimeUnit; |
| 46 | |
| 47 | public class SiddhiManager { |
| 48 | |
| 49 | private ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), |
| 50 | Integer.MAX_VALUE, |
| 51 | 50, |
| 52 | TimeUnit.MICROSECONDS, |
| 53 | new LinkedBlockingQueue<Runnable>()); |
| 54 | Map<String, StreamJunction> streamJunctionMap = new HashMap<String, StreamJunction>(); //contains definition |
| 55 | Map<String, StreamDefinition> streamDefinitionMap = new HashMap<String, StreamDefinition>(); //contains definition |
| 56 | List<Query> queryList = new ArrayList<Query>(); |
| 57 | LinkedBlockingQueue<StateEvent> inputQueue = new LinkedBlockingQueue<StateEvent>(); |
| 58 | |
| 59 | public void defineStream(StreamDefinition streamDefinition) { |
| 60 | checkEventStream(streamDefinition); |
| 61 | StreamJunction streamJunction = new StreamJunction(streamDefinition); |
| 62 | streamJunctionMap.put(streamDefinition.getStreamId(), streamJunction); |
| 63 | streamDefinitionMap.put(streamDefinition.getStreamId(), streamDefinition); |
| 64 | } |
| 65 | |
| 66 | public void defineStream(String streamDefinition) throws SiddhiPraserException { |
| 67 | defineStream(SiddhiCompiler.parseStreamDefinition(streamDefinition)); |
| 68 | } |
| 69 | |
| 70 | private void checkEventStream(StreamDefinition newStreamDefinition) { |
| 71 | StreamDefinition streamDefinition = streamDefinitionMap.get(newStreamDefinition.getStreamId()); |
| 72 | if (streamDefinition != null) { |
| 73 | if (!streamDefinition.getAttributeList().equals(newStreamDefinition.getAttributeList())) { |
| 74 | throw new EventStreamAlreadyExistException(newStreamDefinition.getStreamId() + " is already defined as " + streamDefinition); |
| 75 | } |
| 76 | } |
| 77 | } |
| 78 | |
| 79 | public void addQuery(String query) throws SiddhiPraserException { |
| 80 | addQuery(SiddhiCompiler.parseQuery(query)); |
| 81 | } |
| 82 | |
| 83 | public void addExecutionPlan(String addExecutionPlan) throws SiddhiPraserException { |
| 84 | for (ExecutionPlan executionPlan : SiddhiCompiler.parse(addExecutionPlan)) { |
| 85 | if (executionPlan instanceof StreamDefinition) { |
| 86 | defineStream((StreamDefinition) executionPlan); |
| 87 | } else { |
| 88 | addQuery((Query) executionPlan); |
| 89 | } |
| 90 | } |
| 91 | } |
| 92 | |
| 93 | public void addQuery(Query query) { |
| 94 | |
| 95 | List<QueryEventStream> queryEventStreamList = query.getInputStream().constructQueryEventStreamList(streamDefinitionMap, new ArrayList<QueryEventStream>()); |
| 96 | initQuery(query, queryEventStreamList); |
| 97 | |
| 98 | QueryProjector queryProjector = new QueryProjector(query.getOutputStreamId(), query.getProjector(), queryEventStreamList); |
| 99 | StreamDefinition outputStreamDefinition = queryProjector.getOutputStreamDefinition(); |
| 100 | defineStream(outputStreamDefinition); |
| 101 | |
| 102 | List<StreamReceiver> streamReceiverList = StreamParser.parseStream(query.getInputStream(), queryEventStreamList, queryProjector, threadPoolExecutor); |
| 103 | queryProjector.setStreamJunction(streamJunctionMap.get(outputStreamDefinition.getStreamId())); |
| 104 | for (StreamReceiver streamReceiver : streamReceiverList) { |
| 105 | StreamJunction streamJunction = streamJunctionMap.get(streamReceiver.getStreamId()); |
| 106 | streamJunction.addEventFlow(streamReceiver); |
| 107 | } |
| 108 | |
| 109 | } |
| 110 | |
| 111 | private void initQuery(Query query, List<QueryEventStream> queryEventStreamList) { |
| 112 | |
| 113 | // Map<String, StreamDefinition> queryStreamDefinitionMap = getQueryStreamDefinitionMap(query.getInputStream().getStreamIds()); |
| 114 | //populate projection for * case |
| 115 | List<OutputAttribute> attributeList = query.getProjector().getProjectionList(); |
| 116 | if (attributeList.size() == 0) { |
| 117 | for (QueryEventStream queryEventStream : queryEventStreamList) { |
| 118 | for (Attribute attribute : queryEventStream.getStreamDefinition().getAttributeList()) { |
| 119 | attributeList.add(new SimpleAttribute(queryEventStream.getReferenceStreamId() + "_" + attribute.getName(), Expression.variable(queryEventStream.getReferenceStreamId(), attribute.getName()))); |
| 120 | } |
| 121 | } |
| 122 | } |
| 123 | |
| 124 | } |
| 125 | |
| 126 | private Map<String, StreamDefinition> getQueryStreamDefinitionMap(List<String> streamIds) { |
| 127 | Map<String, StreamDefinition> map = new HashMap<String, StreamDefinition>(); |
| 128 | for (String streamId : streamIds) { |
| 129 | map.put(streamId, streamDefinitionMap.get(streamId)); |
| 130 | } |
| 131 | return map; |
| 132 | } |
| 133 | |
| 134 | |
| 135 | // private void checkQuery(Query query) { |
| 136 | // List<String> streamIds = query.getInputStream().getStreamIds(); |
| 137 | // ////System.out.println(streamIds); |
| 138 | // } |
| 139 | |
| 140 | public InputHandler getInputHandler(String streamId) { |
| 141 | return new InputHandler(streamId, streamJunctionMap.get(streamId)); |
| 142 | } |
| 143 | |
| 144 | public void addCallback(String streamId, Callback callback) { |
| 145 | callback.setStreamId(streamId); |
| 146 | callback.setThreadPoolExecutor(threadPoolExecutor); |
| 147 | streamJunctionMap.get(streamId).addEventFlow(callback); |
| 148 | } |
| 149 | } |