001 /* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.flow.stream; 022 023 import java.io.IOException; 024 import java.util.Iterator; 025 026 import cascading.CascadingException; 027 import cascading.flow.FlowProcess; 028 import cascading.operation.Buffer; 029 import cascading.pipe.Every; 030 import cascading.pipe.OperatorException; 031 import cascading.tuple.Fields; 032 import cascading.tuple.Tuple; 033 import cascading.tuple.TupleEntry; 034 import cascading.tuple.TupleEntryCollector; 035 import cascading.tuple.TupleEntryIterator; 036 import cascading.tuple.Tuples; 037 038 /** 039 * 040 */ 041 public class BufferEveryWindow extends EveryStage<Grouping<TupleEntry, TupleEntryIterator>> implements OpenWindow 042 { 043 Buffer buffer; 044 045 public BufferEveryWindow( FlowProcess flowProcess, Every every ) 046 { 047 super( flowProcess, every ); 048 } 049 050 @Override 051 public void initialize() 052 { 053 super.initialize(); 054 055 buffer = every.getBuffer(); 056 057 outputCollector = new TupleEntryCollector( getOperationDeclaredFields() ) 058 { 059 @Override 060 protected void collect( TupleEntry resultEntry ) throws IOException 061 { 062 Tuple outgoing = outgoingBuilder.makeResult( incomingEntry.getTuple(), resultEntry.getTuple() ); 063 064 outgoingEntry.setTuple( outgoing ); 065 066 try 067 { 068 next.receive( BufferEveryWindow.this, outgoingEntry ); 069 } 070 finally 071 { 072 Tuples.asModifiable( outgoing ); 073 } 074 } 075 }; 076 } 077 078 @Override 079 protected Fields getIncomingPassThroughFields() 080 { 081 return incomingScopes.get( 0 ).getIncomingBufferPassThroughFields(); 082 } 083 084 @Override 085 protected Fields getIncomingArgumentsFields() 086 { 087 return incomingScopes.get( 0 ).getIncomingBufferArgumentFields(); 088 } 089 090 @Override 091 protected Fields getOutgoingSelector() 092 { 093 return outgoingScopes.get( 0 ).getOutGroupingSelector(); 094 } 095 096 @Override 097 public void start( Duct previous ) 098 { 099 next.start( this ); 100 } 101 102 @Override 103 public void receive( Duct previous, final Grouping<TupleEntry, TupleEntryIterator> grouping ) 104 { 105 try 106 { 107 // we want to null out any 'values' before and after the iterator begins/ends 108 // this allows buffers to emit tuples before next() and when hasNext() return false; 109 final TupleEntry tupleEntry = grouping.joinIterator.getTupleEntry(); 110 incomingEntry = tupleEntry; 111 112 // if Fields.NONE are declared on the CoGroup, we don't provide arguments, only the joinerClosure 113 if( !tupleEntry.getFields().isNone() ) 114 { 115 final Tuple valueNulledTuple = Tuples.setOnEmpty( tupleEntry, grouping.key ); 116 tupleEntry.setTuple( valueNulledTuple ); 117 118 operationCall.setArgumentsIterator( createArgumentsIterator( grouping, tupleEntry, valueNulledTuple ) ); 119 } 120 121 operationCall.setOutputCollector( outputCollector ); 122 operationCall.setJoinerClosure( grouping.joinerClosure ); 123 operationCall.setGroup( grouping.key ); 124 125 buffer.operate( flowProcess, operationCall ); 126 } 127 catch( CascadingException exception ) 128 { 129 handleException( exception, argumentsEntry ); 130 } 131 catch( Throwable throwable ) 132 { 133 handleException( new OperatorException( every, "operator Every failed executing operation: " + every.getOperation(), throwable ), argumentsEntry ); 134 } 135 } 136 137 private Iterator<TupleEntry> createArgumentsIterator( final Grouping<TupleEntry, TupleEntryIterator> grouping, final TupleEntry tupleEntry, final Tuple valueNulledTuple ) 138 { 139 return new Iterator<TupleEntry>() 140 { 141 public boolean hasNext() 142 { 143 boolean hasNext = grouping.joinIterator.hasNext(); 144 145 if( !hasNext && !operationCall.isRetainValues() ) 146 tupleEntry.setTuple( valueNulledTuple ); // null out footer entries 147 148 return hasNext; 149 } 150 151 public TupleEntry next() 152 { 153 argumentsEntry.setTuple( argumentsBuilder.makeResult( grouping.joinIterator.next().getTuple(), null ) ); 154 155 return argumentsEntry; 156 } 157 158 public void remove() 159 { 160 grouping.joinIterator.remove(); 161 } 162 }; 163 } 164 }